diff --git a/AGENTS.md b/AGENTS.md deleted file mode 100644 index aa0426a0..00000000 --- a/AGENTS.md +++ /dev/null @@ -1,29 +0,0 @@ - - - - - -## BACKLOG WORKFLOW INSTRUCTIONS - -This project uses Backlog.md MCP for all task and project management activities. - -**CRITICAL GUIDANCE** - -- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. -- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. - -- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow -- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") -- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work - -These guides cover: -- Decision framework for when to create tasks -- Search-first workflow to avoid duplicates -- Links to detailed guides for task creation, execution, and finalization -- MCP tools reference - -You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. - - - - diff --git a/CLAUDE.md b/CLAUDE.md index f88a59d5..2dabf532 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,33 +1,4 @@ - - - - -## BACKLOG WORKFLOW INSTRUCTIONS - -This project uses Backlog.md MCP for all task and project management activities. - -**CRITICAL GUIDANCE** - -- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. -- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. - -- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow -- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") -- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work - -These guides cover: -- Decision framework for when to create tasks -- Search-first workflow to avoid duplicates -- Links to detailed guides for task creation, execution, and finalization -- MCP tools reference - -You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. - - - - - ## Available Skills Five Claude Code skills are installed in this repo's dev container. Each maps to a phase of the feature lifecycle. diff --git a/asset_list/app.py b/asset_list/app.py index 7413c7cb..9b10d7f3 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -79,23 +79,23 @@ def app(): """ data_folder = "/workspaces/model/asset_list" - data_filename = "input.xlsx" - sheet_name = "Handovers" - postcode_column = "POSTCODE" - address1_column = "Full Addres" + data_filename = "lincs_address_list.xlsx" + sheet_name = "Sheet1" + postcode_column = "Postcode" + address1_column = "Deal Name" address1_method = None - fulladdress_column = "Full Addres" + fulladdress_column = "Deal Name" address_cols_to_concat = [] missing_postcodes_method = None landlord_year_built = None - landlord_os_uprn = "domna_found_uprn" - landlord_property_type = "PROPERTY TYPE" # Good to include if landlord gave - landlord_built_form = "Type Description" # Good to include if landlord gave + landlord_os_uprn = None + landlord_property_type = None # Good to include if landlord gave + landlord_built_form = None # Good to include if landlord gave landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None - landlord_property_id = "PROP REF" + landlord_property_id = "landlord_id" landlord_sap = None outcomes_filename = None outcomes_sheetname = None @@ -468,9 +468,3 @@ def app(): asset_list.duplicated_addresses.to_excel( writer, sheet_name="Duplicate Properties", index=False ) - - - - -for key,value in dict.items(): - lsakjfldsa \ No newline at end of file diff --git a/infrastructure/terraform/README.md b/deployment/terraform/README.md similarity index 100% rename from infrastructure/terraform/README.md rename to deployment/terraform/README.md diff --git a/infrastructure/terraform/cdn/main.tf b/deployment/terraform/cdn/main.tf similarity index 100% rename from infrastructure/terraform/cdn/main.tf rename to deployment/terraform/cdn/main.tf diff --git a/infrastructure/terraform/cdn/provider.tf b/deployment/terraform/cdn/provider.tf similarity index 100% rename from infrastructure/terraform/cdn/provider.tf rename to deployment/terraform/cdn/provider.tf diff --git a/infrastructure/terraform/cdn/variables.tf b/deployment/terraform/cdn/variables.tf similarity index 100% rename from infrastructure/terraform/cdn/variables.tf rename to deployment/terraform/cdn/variables.tf diff --git a/infrastructure/terraform/cdn_certificate/main.tf b/deployment/terraform/cdn_certificate/main.tf similarity index 100% rename from infrastructure/terraform/cdn_certificate/main.tf rename to deployment/terraform/cdn_certificate/main.tf diff --git a/infrastructure/terraform/cdn_certificate/outputs.tf b/deployment/terraform/cdn_certificate/outputs.tf similarity index 100% rename from infrastructure/terraform/cdn_certificate/outputs.tf rename to deployment/terraform/cdn_certificate/outputs.tf diff --git a/infrastructure/terraform/cdn_certificate/provider.tf b/deployment/terraform/cdn_certificate/provider.tf similarity index 100% rename from infrastructure/terraform/cdn_certificate/provider.tf rename to deployment/terraform/cdn_certificate/provider.tf diff --git a/infrastructure/terraform/cdn_certificate/variables.tf b/deployment/terraform/cdn_certificate/variables.tf similarity index 100% rename from infrastructure/terraform/cdn_certificate/variables.tf rename to deployment/terraform/cdn_certificate/variables.tf diff --git a/infrastructure/terraform/lambda/_template/README.md b/deployment/terraform/lambda/_template/README.md similarity index 100% rename from infrastructure/terraform/lambda/_template/README.md rename to deployment/terraform/lambda/_template/README.md diff --git a/infrastructure/terraform/lambda/_template/main.tf b/deployment/terraform/lambda/_template/main.tf similarity index 100% rename from infrastructure/terraform/lambda/_template/main.tf rename to deployment/terraform/lambda/_template/main.tf diff --git a/infrastructure/terraform/lambda/_template/provider.tf b/deployment/terraform/lambda/_template/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/_template/provider.tf rename to deployment/terraform/lambda/_template/provider.tf diff --git a/infrastructure/terraform/lambda/_template/variables.tf b/deployment/terraform/lambda/_template/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/_template/variables.tf rename to deployment/terraform/lambda/_template/variables.tf diff --git a/infrastructure/terraform/lambda/address2UPRN/main.tf b/deployment/terraform/lambda/address2UPRN/main.tf similarity index 100% rename from infrastructure/terraform/lambda/address2UPRN/main.tf rename to deployment/terraform/lambda/address2UPRN/main.tf diff --git a/infrastructure/terraform/lambda/address2UPRN/outputs.tf b/deployment/terraform/lambda/address2UPRN/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/address2UPRN/outputs.tf rename to deployment/terraform/lambda/address2UPRN/outputs.tf diff --git a/infrastructure/terraform/lambda/address2UPRN/provider.tf b/deployment/terraform/lambda/address2UPRN/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/address2UPRN/provider.tf rename to deployment/terraform/lambda/address2UPRN/provider.tf diff --git a/infrastructure/terraform/lambda/address2UPRN/variables.tf b/deployment/terraform/lambda/address2UPRN/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/address2UPRN/variables.tf rename to deployment/terraform/lambda/address2UPRN/variables.tf diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/main.tf b/deployment/terraform/lambda/bulk_address2uprn_combiner/main.tf similarity index 100% rename from infrastructure/terraform/lambda/bulk_address2uprn_combiner/main.tf rename to deployment/terraform/lambda/bulk_address2uprn_combiner/main.tf diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/outputs.tf b/deployment/terraform/lambda/bulk_address2uprn_combiner/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/bulk_address2uprn_combiner/outputs.tf rename to deployment/terraform/lambda/bulk_address2uprn_combiner/outputs.tf diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/provider.tf b/deployment/terraform/lambda/bulk_address2uprn_combiner/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/bulk_address2uprn_combiner/provider.tf rename to deployment/terraform/lambda/bulk_address2uprn_combiner/provider.tf diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/variables.tf b/deployment/terraform/lambda/bulk_address2uprn_combiner/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/bulk_address2uprn_combiner/variables.tf rename to deployment/terraform/lambda/bulk_address2uprn_combiner/variables.tf diff --git a/infrastructure/terraform/lambda/categorisation/main.tf b/deployment/terraform/lambda/categorisation/main.tf similarity index 100% rename from infrastructure/terraform/lambda/categorisation/main.tf rename to deployment/terraform/lambda/categorisation/main.tf diff --git a/infrastructure/terraform/lambda/categorisation/outputs.tf b/deployment/terraform/lambda/categorisation/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/categorisation/outputs.tf rename to deployment/terraform/lambda/categorisation/outputs.tf diff --git a/infrastructure/terraform/lambda/categorisation/provider.tf b/deployment/terraform/lambda/categorisation/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/categorisation/provider.tf rename to deployment/terraform/lambda/categorisation/provider.tf diff --git a/infrastructure/terraform/lambda/categorisation/variables.tf b/deployment/terraform/lambda/categorisation/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/categorisation/variables.tf rename to deployment/terraform/lambda/categorisation/variables.tf diff --git a/infrastructure/terraform/lambda/condition-etl/main.tf b/deployment/terraform/lambda/condition-etl/main.tf similarity index 100% rename from infrastructure/terraform/lambda/condition-etl/main.tf rename to deployment/terraform/lambda/condition-etl/main.tf diff --git a/infrastructure/terraform/lambda/condition-etl/provider.tf b/deployment/terraform/lambda/condition-etl/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/condition-etl/provider.tf rename to deployment/terraform/lambda/condition-etl/provider.tf diff --git a/infrastructure/terraform/lambda/condition-etl/variables.tf b/deployment/terraform/lambda/condition-etl/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/condition-etl/variables.tf rename to deployment/terraform/lambda/condition-etl/variables.tf diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/main.tf b/deployment/terraform/lambda/ecmk_to_ara/main.tf similarity index 100% rename from infrastructure/terraform/lambda/ecmk_to_ara/main.tf rename to deployment/terraform/lambda/ecmk_to_ara/main.tf diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf b/deployment/terraform/lambda/ecmk_to_ara/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/ecmk_to_ara/provider.tf rename to deployment/terraform/lambda/ecmk_to_ara/provider.tf diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf b/deployment/terraform/lambda/ecmk_to_ara/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/ecmk_to_ara/variables.tf rename to deployment/terraform/lambda/ecmk_to_ara/variables.tf diff --git a/infrastructure/terraform/lambda/engine/main.tf b/deployment/terraform/lambda/engine/main.tf similarity index 100% rename from infrastructure/terraform/lambda/engine/main.tf rename to deployment/terraform/lambda/engine/main.tf diff --git a/infrastructure/terraform/lambda/engine/outputs.tf b/deployment/terraform/lambda/engine/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/engine/outputs.tf rename to deployment/terraform/lambda/engine/outputs.tf diff --git a/infrastructure/terraform/lambda/engine/provider.tf b/deployment/terraform/lambda/engine/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/engine/provider.tf rename to deployment/terraform/lambda/engine/provider.tf diff --git a/infrastructure/terraform/lambda/engine/variables.tf b/deployment/terraform/lambda/engine/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/engine/variables.tf rename to deployment/terraform/lambda/engine/variables.tf diff --git a/infrastructure/terraform/lambda/fast-api/main.tf b/deployment/terraform/lambda/fast-api/main.tf similarity index 100% rename from infrastructure/terraform/lambda/fast-api/main.tf rename to deployment/terraform/lambda/fast-api/main.tf diff --git a/infrastructure/terraform/lambda/fast-api/outputs.tf b/deployment/terraform/lambda/fast-api/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/fast-api/outputs.tf rename to deployment/terraform/lambda/fast-api/outputs.tf diff --git a/infrastructure/terraform/lambda/fast-api/provider.tf b/deployment/terraform/lambda/fast-api/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/fast-api/provider.tf rename to deployment/terraform/lambda/fast-api/provider.tf diff --git a/infrastructure/terraform/lambda/fast-api/variables.tf b/deployment/terraform/lambda/fast-api/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/fast-api/variables.tf rename to deployment/terraform/lambda/fast-api/variables.tf diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/deployment/terraform/lambda/hubspot_deal_etl/main.tf similarity index 100% rename from infrastructure/terraform/lambda/hubspot_deal_etl/main.tf rename to deployment/terraform/lambda/hubspot_deal_etl/main.tf diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf b/deployment/terraform/lambda/hubspot_deal_etl/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/hubspot_deal_etl/provider.tf rename to deployment/terraform/lambda/hubspot_deal_etl/provider.tf diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf b/deployment/terraform/lambda/hubspot_deal_etl/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/hubspot_deal_etl/variables.tf rename to deployment/terraform/lambda/hubspot_deal_etl/variables.tf diff --git a/infrastructure/terraform/lambda/magic_plan/main.tf b/deployment/terraform/lambda/magic_plan/main.tf similarity index 100% rename from infrastructure/terraform/lambda/magic_plan/main.tf rename to deployment/terraform/lambda/magic_plan/main.tf diff --git a/infrastructure/terraform/lambda/magic_plan/outputs.tf b/deployment/terraform/lambda/magic_plan/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/magic_plan/outputs.tf rename to deployment/terraform/lambda/magic_plan/outputs.tf diff --git a/infrastructure/terraform/lambda/magic_plan/provider.tf b/deployment/terraform/lambda/magic_plan/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/magic_plan/provider.tf rename to deployment/terraform/lambda/magic_plan/provider.tf diff --git a/infrastructure/terraform/lambda/magic_plan/variables.tf b/deployment/terraform/lambda/magic_plan/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/magic_plan/variables.tf rename to deployment/terraform/lambda/magic_plan/variables.tf diff --git a/infrastructure/terraform/lambda/ordnanceSurvey/main.tf b/deployment/terraform/lambda/ordnanceSurvey/main.tf similarity index 100% rename from infrastructure/terraform/lambda/ordnanceSurvey/main.tf rename to deployment/terraform/lambda/ordnanceSurvey/main.tf diff --git a/infrastructure/terraform/lambda/ordnanceSurvey/provider.tf b/deployment/terraform/lambda/ordnanceSurvey/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/ordnanceSurvey/provider.tf rename to deployment/terraform/lambda/ordnanceSurvey/provider.tf diff --git a/infrastructure/terraform/lambda/ordnanceSurvey/variables.tf b/deployment/terraform/lambda/ordnanceSurvey/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/ordnanceSurvey/variables.tf rename to deployment/terraform/lambda/ordnanceSurvey/variables.tf diff --git a/infrastructure/terraform/lambda/pashub_to_ara/main.tf b/deployment/terraform/lambda/pashub_to_ara/main.tf similarity index 100% rename from infrastructure/terraform/lambda/pashub_to_ara/main.tf rename to deployment/terraform/lambda/pashub_to_ara/main.tf diff --git a/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf b/deployment/terraform/lambda/pashub_to_ara/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/pashub_to_ara/outputs.tf rename to deployment/terraform/lambda/pashub_to_ara/outputs.tf diff --git a/infrastructure/terraform/lambda/pashub_to_ara/provider.tf b/deployment/terraform/lambda/pashub_to_ara/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/pashub_to_ara/provider.tf rename to deployment/terraform/lambda/pashub_to_ara/provider.tf diff --git a/infrastructure/terraform/lambda/pashub_to_ara/variables.tf b/deployment/terraform/lambda/pashub_to_ara/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/pashub_to_ara/variables.tf rename to deployment/terraform/lambda/pashub_to_ara/variables.tf diff --git a/infrastructure/terraform/lambda/postcodeSplitter/main.tf b/deployment/terraform/lambda/postcodeSplitter/main.tf similarity index 100% rename from infrastructure/terraform/lambda/postcodeSplitter/main.tf rename to deployment/terraform/lambda/postcodeSplitter/main.tf diff --git a/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf b/deployment/terraform/lambda/postcodeSplitter/outputs.tf similarity index 100% rename from infrastructure/terraform/lambda/postcodeSplitter/outputs.tf rename to deployment/terraform/lambda/postcodeSplitter/outputs.tf diff --git a/infrastructure/terraform/lambda/postcodeSplitter/provider.tf b/deployment/terraform/lambda/postcodeSplitter/provider.tf similarity index 100% rename from infrastructure/terraform/lambda/postcodeSplitter/provider.tf rename to deployment/terraform/lambda/postcodeSplitter/provider.tf diff --git a/infrastructure/terraform/lambda/postcodeSplitter/variables.tf b/deployment/terraform/lambda/postcodeSplitter/variables.tf similarity index 100% rename from infrastructure/terraform/lambda/postcodeSplitter/variables.tf rename to deployment/terraform/lambda/postcodeSplitter/variables.tf diff --git a/infrastructure/terraform/modules/acm_certificate/main.tf b/deployment/terraform/modules/acm_certificate/main.tf similarity index 100% rename from infrastructure/terraform/modules/acm_certificate/main.tf rename to deployment/terraform/modules/acm_certificate/main.tf diff --git a/infrastructure/terraform/modules/acm_certificate/outputs.tf b/deployment/terraform/modules/acm_certificate/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/acm_certificate/outputs.tf rename to deployment/terraform/modules/acm_certificate/outputs.tf diff --git a/infrastructure/terraform/modules/acm_certificate/variables.tf b/deployment/terraform/modules/acm_certificate/variables.tf similarity index 100% rename from infrastructure/terraform/modules/acm_certificate/variables.tf rename to deployment/terraform/modules/acm_certificate/variables.tf diff --git a/infrastructure/terraform/modules/cloudfront/main.tf b/deployment/terraform/modules/cloudfront/main.tf similarity index 100% rename from infrastructure/terraform/modules/cloudfront/main.tf rename to deployment/terraform/modules/cloudfront/main.tf diff --git a/infrastructure/terraform/modules/cloudfront/variables.tf b/deployment/terraform/modules/cloudfront/variables.tf similarity index 100% rename from infrastructure/terraform/modules/cloudfront/variables.tf rename to deployment/terraform/modules/cloudfront/variables.tf diff --git a/infrastructure/terraform/modules/container_registry/main.tf b/deployment/terraform/modules/container_registry/main.tf similarity index 100% rename from infrastructure/terraform/modules/container_registry/main.tf rename to deployment/terraform/modules/container_registry/main.tf diff --git a/infrastructure/terraform/modules/container_registry/outputs.tf b/deployment/terraform/modules/container_registry/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/container_registry/outputs.tf rename to deployment/terraform/modules/container_registry/outputs.tf diff --git a/infrastructure/terraform/modules/container_registry/variables.tf b/deployment/terraform/modules/container_registry/variables.tf similarity index 100% rename from infrastructure/terraform/modules/container_registry/variables.tf rename to deployment/terraform/modules/container_registry/variables.tf diff --git a/infrastructure/terraform/modules/ecr/main.tf b/deployment/terraform/modules/ecr/main.tf similarity index 100% rename from infrastructure/terraform/modules/ecr/main.tf rename to deployment/terraform/modules/ecr/main.tf diff --git a/infrastructure/terraform/modules/ecr/outputs.tf b/deployment/terraform/modules/ecr/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/ecr/outputs.tf rename to deployment/terraform/modules/ecr/outputs.tf diff --git a/infrastructure/terraform/modules/ecr/variables.tf b/deployment/terraform/modules/ecr/variables.tf similarity index 100% rename from infrastructure/terraform/modules/ecr/variables.tf rename to deployment/terraform/modules/ecr/variables.tf diff --git a/infrastructure/terraform/modules/general_iam_policy/main.tf b/deployment/terraform/modules/general_iam_policy/main.tf similarity index 100% rename from infrastructure/terraform/modules/general_iam_policy/main.tf rename to deployment/terraform/modules/general_iam_policy/main.tf diff --git a/infrastructure/terraform/modules/general_iam_policy/outputs.tf b/deployment/terraform/modules/general_iam_policy/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/general_iam_policy/outputs.tf rename to deployment/terraform/modules/general_iam_policy/outputs.tf diff --git a/infrastructure/terraform/modules/general_iam_policy/variables.tf b/deployment/terraform/modules/general_iam_policy/variables.tf similarity index 100% rename from infrastructure/terraform/modules/general_iam_policy/variables.tf rename to deployment/terraform/modules/general_iam_policy/variables.tf diff --git a/infrastructure/terraform/modules/lambda_execution_role/main.tf b/deployment/terraform/modules/lambda_execution_role/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_execution_role/main.tf rename to deployment/terraform/modules/lambda_execution_role/main.tf diff --git a/infrastructure/terraform/modules/lambda_execution_role/outputs.tf b/deployment/terraform/modules/lambda_execution_role/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_execution_role/outputs.tf rename to deployment/terraform/modules/lambda_execution_role/outputs.tf diff --git a/infrastructure/terraform/modules/lambda_execution_role/variables.tf b/deployment/terraform/modules/lambda_execution_role/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_execution_role/variables.tf rename to deployment/terraform/modules/lambda_execution_role/variables.tf diff --git a/infrastructure/terraform/modules/lambda_service/main.tf b/deployment/terraform/modules/lambda_service/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_service/main.tf rename to deployment/terraform/modules/lambda_service/main.tf diff --git a/infrastructure/terraform/modules/lambda_service/outputs.tf b/deployment/terraform/modules/lambda_service/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_service/outputs.tf rename to deployment/terraform/modules/lambda_service/outputs.tf diff --git a/infrastructure/terraform/modules/lambda_service/variables.tf b/deployment/terraform/modules/lambda_service/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_service/variables.tf rename to deployment/terraform/modules/lambda_service/variables.tf diff --git a/infrastructure/terraform/modules/lambda_service_zip/main.tf b/deployment/terraform/modules/lambda_service_zip/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_service_zip/main.tf rename to deployment/terraform/modules/lambda_service_zip/main.tf diff --git a/infrastructure/terraform/modules/lambda_service_zip/variables.tf b/deployment/terraform/modules/lambda_service_zip/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_service_zip/variables.tf rename to deployment/terraform/modules/lambda_service_zip/variables.tf diff --git a/infrastructure/terraform/modules/lambda_sqs_trigger/main.tf b/deployment/terraform/modules/lambda_sqs_trigger/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_sqs_trigger/main.tf rename to deployment/terraform/modules/lambda_sqs_trigger/main.tf diff --git a/infrastructure/terraform/modules/lambda_sqs_trigger/variables.tf b/deployment/terraform/modules/lambda_sqs_trigger/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_sqs_trigger/variables.tf rename to deployment/terraform/modules/lambda_sqs_trigger/variables.tf diff --git a/infrastructure/terraform/modules/lambda_with_api_gateway/main.tf b/deployment/terraform/modules/lambda_with_api_gateway/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_api_gateway/main.tf rename to deployment/terraform/modules/lambda_with_api_gateway/main.tf diff --git a/infrastructure/terraform/modules/lambda_with_api_gateway/outputs.tf b/deployment/terraform/modules/lambda_with_api_gateway/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_api_gateway/outputs.tf rename to deployment/terraform/modules/lambda_with_api_gateway/outputs.tf diff --git a/infrastructure/terraform/modules/lambda_with_api_gateway/variables.tf b/deployment/terraform/modules/lambda_with_api_gateway/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_api_gateway/variables.tf rename to deployment/terraform/modules/lambda_with_api_gateway/variables.tf diff --git a/infrastructure/terraform/modules/lambda_with_sqs/main.tf b/deployment/terraform/modules/lambda_with_sqs/main.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_sqs/main.tf rename to deployment/terraform/modules/lambda_with_sqs/main.tf diff --git a/infrastructure/terraform/modules/lambda_with_sqs/outputs.tf b/deployment/terraform/modules/lambda_with_sqs/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_sqs/outputs.tf rename to deployment/terraform/modules/lambda_with_sqs/outputs.tf diff --git a/infrastructure/terraform/modules/lambda_with_sqs/variables.tf b/deployment/terraform/modules/lambda_with_sqs/variables.tf similarity index 100% rename from infrastructure/terraform/modules/lambda_with_sqs/variables.tf rename to deployment/terraform/modules/lambda_with_sqs/variables.tf diff --git a/infrastructure/terraform/modules/route53/main.tf b/deployment/terraform/modules/route53/main.tf similarity index 100% rename from infrastructure/terraform/modules/route53/main.tf rename to deployment/terraform/modules/route53/main.tf diff --git a/infrastructure/terraform/modules/route53/variables.tf b/deployment/terraform/modules/route53/variables.tf similarity index 100% rename from infrastructure/terraform/modules/route53/variables.tf rename to deployment/terraform/modules/route53/variables.tf diff --git a/infrastructure/terraform/modules/s3/main.tf b/deployment/terraform/modules/s3/main.tf similarity index 100% rename from infrastructure/terraform/modules/s3/main.tf rename to deployment/terraform/modules/s3/main.tf diff --git a/infrastructure/terraform/modules/s3/outputs.tf b/deployment/terraform/modules/s3/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/s3/outputs.tf rename to deployment/terraform/modules/s3/outputs.tf diff --git a/infrastructure/terraform/modules/s3/variables.tf b/deployment/terraform/modules/s3/variables.tf similarity index 100% rename from infrastructure/terraform/modules/s3/variables.tf rename to deployment/terraform/modules/s3/variables.tf diff --git a/infrastructure/terraform/modules/s3_iam_policy/main.tf b/deployment/terraform/modules/s3_iam_policy/main.tf similarity index 100% rename from infrastructure/terraform/modules/s3_iam_policy/main.tf rename to deployment/terraform/modules/s3_iam_policy/main.tf diff --git a/infrastructure/terraform/modules/s3_iam_policy/outputs.tf b/deployment/terraform/modules/s3_iam_policy/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/s3_iam_policy/outputs.tf rename to deployment/terraform/modules/s3_iam_policy/outputs.tf diff --git a/infrastructure/terraform/modules/s3_iam_policy/variables.tf b/deployment/terraform/modules/s3_iam_policy/variables.tf similarity index 100% rename from infrastructure/terraform/modules/s3_iam_policy/variables.tf rename to deployment/terraform/modules/s3_iam_policy/variables.tf diff --git a/infrastructure/terraform/modules/s3_presignable_bucket/main.tf b/deployment/terraform/modules/s3_presignable_bucket/main.tf similarity index 100% rename from infrastructure/terraform/modules/s3_presignable_bucket/main.tf rename to deployment/terraform/modules/s3_presignable_bucket/main.tf diff --git a/infrastructure/terraform/modules/s3_presignable_bucket/outputs.tf b/deployment/terraform/modules/s3_presignable_bucket/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/s3_presignable_bucket/outputs.tf rename to deployment/terraform/modules/s3_presignable_bucket/outputs.tf diff --git a/infrastructure/terraform/modules/s3_presignable_bucket/variables.tf b/deployment/terraform/modules/s3_presignable_bucket/variables.tf similarity index 100% rename from infrastructure/terraform/modules/s3_presignable_bucket/variables.tf rename to deployment/terraform/modules/s3_presignable_bucket/variables.tf diff --git a/infrastructure/terraform/modules/ses/main.tf b/deployment/terraform/modules/ses/main.tf similarity index 100% rename from infrastructure/terraform/modules/ses/main.tf rename to deployment/terraform/modules/ses/main.tf diff --git a/infrastructure/terraform/modules/ses/outputs.tf b/deployment/terraform/modules/ses/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/ses/outputs.tf rename to deployment/terraform/modules/ses/outputs.tf diff --git a/infrastructure/terraform/modules/ses/variables.tf b/deployment/terraform/modules/ses/variables.tf similarity index 100% rename from infrastructure/terraform/modules/ses/variables.tf rename to deployment/terraform/modules/ses/variables.tf diff --git a/infrastructure/terraform/modules/sqs_queue/main.tf b/deployment/terraform/modules/sqs_queue/main.tf similarity index 100% rename from infrastructure/terraform/modules/sqs_queue/main.tf rename to deployment/terraform/modules/sqs_queue/main.tf diff --git a/infrastructure/terraform/modules/sqs_queue/outputs.tf b/deployment/terraform/modules/sqs_queue/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/sqs_queue/outputs.tf rename to deployment/terraform/modules/sqs_queue/outputs.tf diff --git a/infrastructure/terraform/modules/sqs_queue/variables.tf b/deployment/terraform/modules/sqs_queue/variables.tf similarity index 100% rename from infrastructure/terraform/modules/sqs_queue/variables.tf rename to deployment/terraform/modules/sqs_queue/variables.tf diff --git a/infrastructure/terraform/modules/tf_state_bucket/main.tf b/deployment/terraform/modules/tf_state_bucket/main.tf similarity index 100% rename from infrastructure/terraform/modules/tf_state_bucket/main.tf rename to deployment/terraform/modules/tf_state_bucket/main.tf diff --git a/infrastructure/terraform/modules/tf_state_bucket/outputs.tf b/deployment/terraform/modules/tf_state_bucket/outputs.tf similarity index 100% rename from infrastructure/terraform/modules/tf_state_bucket/outputs.tf rename to deployment/terraform/modules/tf_state_bucket/outputs.tf diff --git a/infrastructure/terraform/modules/tf_state_bucket/variables.tf b/deployment/terraform/modules/tf_state_bucket/variables.tf similarity index 100% rename from infrastructure/terraform/modules/tf_state_bucket/variables.tf rename to deployment/terraform/modules/tf_state_bucket/variables.tf diff --git a/infrastructure/terraform/shared/dev.tfvars b/deployment/terraform/shared/dev.tfvars similarity index 100% rename from infrastructure/terraform/shared/dev.tfvars rename to deployment/terraform/shared/dev.tfvars diff --git a/infrastructure/terraform/shared/main.tf b/deployment/terraform/shared/main.tf similarity index 100% rename from infrastructure/terraform/shared/main.tf rename to deployment/terraform/shared/main.tf diff --git a/infrastructure/terraform/shared/secrets.tf b/deployment/terraform/shared/secrets.tf similarity index 100% rename from infrastructure/terraform/shared/secrets.tf rename to deployment/terraform/shared/secrets.tf diff --git a/infrastructure/terraform/shared/variables.tf b/deployment/terraform/shared/variables.tf similarity index 100% rename from infrastructure/terraform/shared/variables.tf rename to deployment/terraform/shared/variables.tf diff --git a/domain/__init__.py b/domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/domain/tasks/__init__.py b/domain/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/domain/tasks/subtasks.py b/domain/tasks/subtasks.py new file mode 100644 index 00000000..bd49a6ec --- /dev/null +++ b/domain/tasks/subtasks.py @@ -0,0 +1,55 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Optional +from uuid import UUID, uuid4 + + +class SubTaskStatus(str, Enum): + WAITING = "waiting" + IN_PROGRESS = "in progress" + COMPLETE = "complete" + FAILED = "failed" + + +@dataclass +class SubTask: + id: UUID + task_id: UUID + status: SubTaskStatus = SubTaskStatus.WAITING + inputs: Optional[dict[str, Any]] = None + outputs: Optional[dict[str, Any]] = None + cloud_logs_url: Optional[str] = None + job_started: Optional[datetime] = None + job_completed: Optional[datetime] = None + + @classmethod + def create( + cls, *, task_id: UUID, inputs: Optional[dict[str, Any]] = None + ) -> "SubTask": + return cls( + id=uuid4(), + task_id=task_id, + status=SubTaskStatus.WAITING, + inputs=inputs, + ) + + def start(self, cloud_logs_url: Optional[str] = None) -> None: + if self.status not in (SubTaskStatus.WAITING, SubTaskStatus.IN_PROGRESS): + raise ValueError(f"cannot start subtask in status {self.status}") + if self.job_started is None: + self.job_started = datetime.now(timezone.utc) + self.status = SubTaskStatus.IN_PROGRESS + if cloud_logs_url is not None: + self.cloud_logs_url = cloud_logs_url + + def complete(self, result: Any = None) -> None: + self.status = SubTaskStatus.COMPLETE + self.job_completed = datetime.now(timezone.utc) + if result is not None: + self.outputs = {"result": result} + + def fail(self, error: BaseException) -> None: + self.status = SubTaskStatus.FAILED + self.job_completed = datetime.now(timezone.utc) + self.outputs = {"error": str(error)} diff --git a/domain/tasks/tasks.py b/domain/tasks/tasks.py new file mode 100644 index 00000000..177258d6 --- /dev/null +++ b/domain/tasks/tasks.py @@ -0,0 +1,94 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Optional +from uuid import UUID, uuid4 + +from domain.tasks.subtasks import SubTaskStatus + + +class TaskStatus(str, Enum): + WAITING = "waiting" + IN_PROGRESS = "in progress" + COMPLETE = "complete" + FAILED = "failed" + + +class Source(str, Enum): + PORTFOLIO = "portfolio_id" + HUBSPOT_DEAL = "hubspot_deal_id" + + +@dataclass +class Task: + id: UUID + task_source: str + status: TaskStatus = TaskStatus.WAITING + service: Optional[str] = None + source: Optional[Source] = None + source_id: Optional[str] = None + job_started: Optional[datetime] = None + job_completed: Optional[datetime] = None + + @classmethod + def create( + cls, + *, + task_source: str, + service: Optional[str] = None, + source: Optional[Source] = None, + source_id: Optional[str] = None, + ) -> "Task": + if not task_source.strip(): + raise ValueError("task_source must be non-empty") + return cls( + id=uuid4(), + task_source=task_source, + service=service, + source=source, + source_id=source_id, + status=TaskStatus.WAITING, + job_started=datetime.now(timezone.utc), + ) + + def start(self) -> None: + if self.status not in (TaskStatus.WAITING, TaskStatus.IN_PROGRESS): + raise ValueError(f"cannot start task in status {self.status}") + if self.job_started is None: + self.job_started = datetime.now(timezone.utc) + self.status = TaskStatus.IN_PROGRESS + + def complete(self) -> None: + self.status = TaskStatus.COMPLETE + self.job_completed = datetime.now(timezone.utc) + + def fail(self) -> None: + self.status = TaskStatus.FAILED + self.job_completed = datetime.now(timezone.utc) + + def recalculate_from_subtasks(self, statuses: list[SubTaskStatus]) -> None: + """Recompute Task.status from its SubTasks' statuses. + + Rule (preserved from legacy _update_task_progress): + - any FAILED → FAILED + - all COMPLETE → COMPLETE + - any IN_PROGRESS → IN_PROGRESS + - otherwise → WAITING + + Empty list is a no-op (newly-created task with no subtasks). + """ + if not statuses: + return + now = datetime.now(timezone.utc) + if SubTaskStatus.FAILED in statuses: + self.status = TaskStatus.FAILED + self.job_completed = now + elif all(s is SubTaskStatus.COMPLETE for s in statuses): + self.status = TaskStatus.COMPLETE + self.job_completed = now + elif SubTaskStatus.IN_PROGRESS in statuses: + self.status = TaskStatus.IN_PROGRESS + self.job_completed = None + else: + self.status = TaskStatus.WAITING + self.job_completed = None diff --git a/infrastructure/__init__.py b/infrastructure/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/infrastructure/postgres/__init__.py b/infrastructure/postgres/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/infrastructure/postgres/config.py b/infrastructure/postgres/config.py new file mode 100644 index 00000000..c39c6f30 --- /dev/null +++ b/infrastructure/postgres/config.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass +from typing import Mapping + + +@dataclass(frozen=True) +class PostgresConfig: + host: str + port: int + username: str + password: str + database: str + driver: str = "psycopg2" + pool_size: int = 3 + max_overflow: int = 5 + pool_pre_ping: bool = True + pool_recycle: int = 300 + + def url(self) -> str: + return ( + f"postgresql+{self.driver}://" + f"{self.username}:{self.password}@{self.host}:{self.port}/{self.database}" + ) + + @classmethod + def from_env(cls, env: Mapping[str, str]) -> "PostgresConfig": + return cls( + host=env["POSTGRES_HOST"], + port=int(env["POSTGRES_PORT"]), + username=env["POSTGRES_USERNAME"], + password=env["POSTGRES_PASSWORD"], + database=env["POSTGRES_DATABASE"], + driver=env.get("POSTGRES_DRIVER", "psycopg2"), + ) diff --git a/infrastructure/postgres/engine.py b/infrastructure/postgres/engine.py new file mode 100644 index 00000000..0de9efcb --- /dev/null +++ b/infrastructure/postgres/engine.py @@ -0,0 +1,18 @@ +from sqlalchemy.engine import Engine +from sqlmodel import Session, create_engine + +from infrastructure.postgres.config import PostgresConfig + + +def make_engine(config: PostgresConfig) -> Engine: + return create_engine( + config.url(), + pool_size=config.pool_size, + max_overflow=config.max_overflow, + pool_pre_ping=config.pool_pre_ping, + pool_recycle=config.pool_recycle, + ) + + +def make_session(engine: Engine) -> Session: + return Session(engine) diff --git a/infrastructure/postgres/subtask_table.py b/infrastructure/postgres/subtask_table.py new file mode 100644 index 00000000..dec34fbf --- /dev/null +++ b/infrastructure/postgres/subtask_table.py @@ -0,0 +1,21 @@ +from datetime import datetime, timezone +from typing import ClassVar, Optional +from uuid import UUID, uuid4 + +from sqlmodel import Field, SQLModel + + +class SubTaskRow(SQLModel, table=True): + __tablename__: ClassVar[str] = "sub_task" # pyright: ignore[reportIncompatibleVariableOverride] + + id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) + task_id: UUID = Field(foreign_key="tasks.id") + job_started: Optional[datetime] = None + job_completed: Optional[datetime] = None + status: str = Field(default="waiting") + inputs: Optional[str] = None + outputs: Optional[str] = None + cloud_logs_url: Optional[str] = None + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) diff --git a/infrastructure/postgres/task_table.py b/infrastructure/postgres/task_table.py new file mode 100644 index 00000000..32e5450b --- /dev/null +++ b/infrastructure/postgres/task_table.py @@ -0,0 +1,36 @@ +from datetime import datetime, timezone +from typing import ClassVar, Optional +from uuid import UUID, uuid4 + +from sqlalchemy import Column +from sqlalchemy import Enum as SAEnum +from sqlmodel import Field, SQLModel + +from domain.tasks.tasks import Source + + +class TaskRow(SQLModel, table=True): + __tablename__: ClassVar[str] = "tasks" # pyright: ignore[reportIncompatibleVariableOverride] + + id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) + task_source: str + job_started: Optional[datetime] = None + job_completed: Optional[datetime] = None + status: str = Field(default="waiting") + service: Optional[str] = None + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) + ) + + source: Optional[Source] = Field( + default=None, + sa_column=Column( + SAEnum( + Source, + name="source", + values_callable=lambda cls: [m.value for m in cls], # pyright: ignore[reportUnknownLambdaType, reportUnknownMemberType, reportUnknownVariableType] + ), + nullable=True, + ), + ) + source_id: Optional[str] = None diff --git a/orchestration/__init__.py b/orchestration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/task_orchestrator.py b/orchestration/task_orchestrator.py new file mode 100644 index 00000000..6c67d1ce --- /dev/null +++ b/orchestration/task_orchestrator.py @@ -0,0 +1,96 @@ +from typing import Any, Callable, Optional +from uuid import UUID + +from domain.tasks.subtasks import SubTask +from domain.tasks.tasks import Source, Task +from repositories.tasks.subtask_repository import SubTaskRepository +from repositories.tasks.task_repository import TaskRepository +from utilities.private import private + + +class TaskOrchestrator: + """Coordinates Task + SubTask lifecycle. + + Exposes primitives (start/complete/fail_subtask) for handlers that want + fine-grained control, and a high-level run_subtask wrapper that owns the + try/except so it can replace the body of the legacy subtask_handler + decorator in backend/utils/subtasks.py. + + Each primitive saves the SubTask, then recomputes the parent Task's + status from all its children. + """ + + def __init__( + self, + task_repo: TaskRepository, + subtask_repo: SubTaskRepository, + ) -> None: + self._tasks = task_repo + self._subtasks = subtask_repo + + def create_task_with_subtask( + self, + *, + task_source: str, + inputs: Optional[dict[str, Any]] = None, + service: Optional[str] = None, + source: Optional[Source] = None, + source_id: Optional[str] = None, + ) -> tuple[Task, SubTask]: + task = Task.create( + task_source=task_source, + service=service, + source=source, + source_id=source_id, + ) + self._tasks.create(task) + subtask = SubTask.create(task_id=task.id, inputs=inputs) + self._subtasks.create(subtask) + return task, subtask + + def start_subtask( + self, subtask_id: UUID, cloud_logs_url: Optional[str] = None + ) -> SubTask: + subtask = self._subtasks.get(subtask_id) + subtask.start(cloud_logs_url) + self._subtasks.save(subtask) + self._cascade(subtask.task_id) + return subtask + + def complete_subtask( + self, subtask_id: UUID, result: Any = None + ) -> SubTask: + subtask = self._subtasks.get(subtask_id) + subtask.complete(result) + self._subtasks.save(subtask) + self._cascade(subtask.task_id) + return subtask + + def fail_subtask(self, subtask_id: UUID, error: BaseException) -> SubTask: + subtask = self._subtasks.get(subtask_id) + subtask.fail(error) + self._subtasks.save(subtask) + self._cascade(subtask.task_id) + return subtask + + def run_subtask( + self, + subtask_id: UUID, + work: Callable[[], Any], + cloud_logs_url: Optional[str] = None, + ) -> Any: + self.start_subtask(subtask_id, cloud_logs_url) + try: + result = work() + except Exception as e: + self.fail_subtask(subtask_id, e) + raise + self.complete_subtask(subtask_id, result) + return result + + @private + def _cascade(self, task_id: UUID) -> None: + statuses = [s.status for s in self._subtasks.list_by_task(task_id)] + task = self._tasks.get(task_id) + task.recalculate_from_subtasks(statuses) + self._tasks.save(task) diff --git a/repositories/__init__.py b/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/repositories/tasks/__init__.py b/repositories/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/repositories/tasks/subtask_postgres_repository.py b/repositories/tasks/subtask_postgres_repository.py new file mode 100644 index 00000000..affc280e --- /dev/null +++ b/repositories/tasks/subtask_postgres_repository.py @@ -0,0 +1,89 @@ +import json +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import UUID + +from sqlmodel import Session, select + +from domain.tasks.subtasks import SubTask, SubTaskStatus +from infrastructure.postgres.subtask_table import SubTaskRow +from repositories.tasks.subtask_repository import SubTaskRepository +from utilities.private import private + + +class SubTaskPostgresRepository(SubTaskRepository): + def __init__(self, session: Session) -> None: + self._session = session + + def create(self, subtask: SubTask) -> SubTask: + row = self._to_row(subtask) + self._session.add(row) + self._session.commit() + self._session.refresh(row) + return self._to_domain(row) + + def get(self, subtask_id: UUID) -> SubTask: + row = self._session.get(SubTaskRow, subtask_id) + if row is None: + raise ValueError(f"SubTask {subtask_id} not found") + return self._to_domain(row) + + def save(self, subtask: SubTask) -> None: + row = self._session.get(SubTaskRow, subtask.id) + if row is None: + raise ValueError(f"SubTask {subtask.id} not found") + row.status = subtask.status.value + row.job_started = subtask.job_started + row.job_completed = subtask.job_completed + row.inputs = ( + json.dumps(subtask.inputs) if subtask.inputs is not None else None + ) + row.outputs = ( + json.dumps(subtask.outputs) if subtask.outputs is not None else None + ) + row.cloud_logs_url = subtask.cloud_logs_url + row.updated_at = datetime.now(timezone.utc) + self._session.add(row) + self._session.commit() + + def list_by_task(self, task_id: UUID) -> list[SubTask]: + rows = self._session.exec( + select(SubTaskRow).where(SubTaskRow.task_id == task_id) + ).all() + return [self._to_domain(r) for r in rows] + + @private + def _to_row(self, subtask: SubTask) -> SubTaskRow: + return SubTaskRow( + id=subtask.id, + task_id=subtask.task_id, + status=subtask.status.value, + inputs=( + json.dumps(subtask.inputs) if subtask.inputs is not None else None + ), + outputs=( + json.dumps(subtask.outputs) + if subtask.outputs is not None + else None + ), + cloud_logs_url=subtask.cloud_logs_url, + job_started=subtask.job_started, + job_completed=subtask.job_completed, + ) + + @private + def _to_domain(self, row: SubTaskRow) -> SubTask: + return SubTask( + id=row.id, + task_id=row.task_id, + status=SubTaskStatus(row.status.lower()), + inputs=_loads_or_none(row.inputs), + outputs=_loads_or_none(row.outputs), + cloud_logs_url=row.cloud_logs_url, + job_started=row.job_started, + job_completed=row.job_completed, + ) + + +def _loads_or_none(s: Optional[str]) -> Optional[dict[str, Any]]: + return json.loads(s) if s else None diff --git a/repositories/tasks/subtask_repository.py b/repositories/tasks/subtask_repository.py new file mode 100644 index 00000000..adb36f99 --- /dev/null +++ b/repositories/tasks/subtask_repository.py @@ -0,0 +1,18 @@ +from abc import ABC, abstractmethod +from uuid import UUID + +from domain.tasks.subtasks import SubTask + + +class SubTaskRepository(ABC): + @abstractmethod + def create(self, subtask: SubTask) -> SubTask: ... + + @abstractmethod + def get(self, subtask_id: UUID) -> SubTask: ... + + @abstractmethod + def save(self, subtask: SubTask) -> None: ... + + @abstractmethod + def list_by_task(self, task_id: UUID) -> list[SubTask]: ... diff --git a/repositories/tasks/task_postgres_repository.py b/repositories/tasks/task_postgres_repository.py new file mode 100644 index 00000000..d23fe91c --- /dev/null +++ b/repositories/tasks/task_postgres_repository.py @@ -0,0 +1,77 @@ +""" +Postgres implementation of TaskRepository. + +NOTE: this repository owns only the `tasks` table. Unlike the legacy +backend.app.db.functions.tasks.Tasks.TasksInterface.create_task, it does NOT +auto-create a child SubTask. Do not rewire existing Lambda callers to this +repo until the SubTask aggregate + TaskOrchestrator slice lands — they would +silently lose their initial SubTask row. +""" + +from datetime import datetime, timezone +from uuid import UUID + +from sqlmodel import Session + +from domain.tasks.tasks import Task, TaskStatus +from infrastructure.postgres.task_table import TaskRow +from repositories.tasks.task_repository import TaskRepository +from utilities.private import private + + +class TaskPostgresRepository(TaskRepository): + def __init__(self, session: Session) -> None: + self._session = session + + def create(self, task: Task) -> Task: + row = self._to_row(task) + self._session.add(row) + self._session.commit() + self._session.refresh(row) + return self._to_domain(row) + + def get(self, task_id: UUID) -> Task: + row = self._session.get(TaskRow, task_id) + if row is None: + raise ValueError(f"Task {task_id} not found") + return self._to_domain(row) + + def save(self, task: Task) -> None: + row = self._session.get(TaskRow, task.id) + if row is None: + raise ValueError(f"Task {task.id} not found") + row.status = task.status.value + row.job_started = task.job_started + row.job_completed = task.job_completed + row.service = task.service + row.source = task.source + row.source_id = task.source_id + row.updated_at = datetime.now(timezone.utc) + self._session.add(row) + self._session.commit() + + @private + def _to_row(self, task: Task) -> TaskRow: + return TaskRow( + id=task.id, + task_source=task.task_source, + status=task.status.value, + service=task.service, + source=task.source, + source_id=task.source_id, + job_started=task.job_started, + job_completed=task.job_completed, + ) + + @private + def _to_domain(self, row: TaskRow) -> Task: + return Task( + id=row.id, + task_source=row.task_source, + status=TaskStatus(row.status.lower()), + service=row.service, + source=row.source, + source_id=row.source_id, + job_started=row.job_started, + job_completed=row.job_completed, + ) diff --git a/repositories/tasks/task_repository.py b/repositories/tasks/task_repository.py new file mode 100644 index 00000000..8bdce0cc --- /dev/null +++ b/repositories/tasks/task_repository.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod +from uuid import UUID + +from domain.tasks.tasks import Task + + +class TaskRepository(ABC): + @abstractmethod + def create(self, task: Task) -> Task: ... + + @abstractmethod + def get(self, task_id: UUID) -> Task: ... + + @abstractmethod + def save(self, task: Task) -> None: ... diff --git a/run_backlog.sh b/run_backlog.sh deleted file mode 100644 index 398e921c..00000000 --- a/run_backlog.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -backlog browser --port 6421 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/domain/__init__.py b/tests/domain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/domain/tasks/__init__.py b/tests/domain/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/domain/tasks/test_subtasks.py b/tests/domain/tasks/test_subtasks.py new file mode 100644 index 00000000..2721d38f --- /dev/null +++ b/tests/domain/tasks/test_subtasks.py @@ -0,0 +1,75 @@ +from uuid import uuid4 + +import pytest + +from domain.tasks.subtasks import SubTask, SubTaskStatus + + +def test_create_subtask_starts_waiting() -> None: + task_id = uuid4() + + st = SubTask.create(task_id=task_id, inputs={"foo": "bar"}) + + assert st.task_id == task_id + assert st.status is SubTaskStatus.WAITING + assert st.inputs == {"foo": "bar"} + assert st.outputs is None + assert st.job_started is None + assert st.job_completed is None + + +def test_start_transitions_to_in_progress_and_sets_cloud_logs_url() -> None: + st = SubTask.create(task_id=uuid4()) + + st.start(cloud_logs_url="https://example/log") + + assert st.status is SubTaskStatus.IN_PROGRESS + assert st.cloud_logs_url == "https://example/log" + assert st.job_started is not None + + +def test_start_is_idempotent_from_in_progress() -> None: + st = SubTask.create(task_id=uuid4()) + st.start() + first_start = st.job_started + + st.start(cloud_logs_url="https://other") + + assert st.status is SubTaskStatus.IN_PROGRESS + assert st.job_started == first_start # not overwritten + assert st.cloud_logs_url == "https://other" + + +def test_start_rejects_from_terminal_status() -> None: + st = SubTask.create(task_id=uuid4()) + st.complete() + with pytest.raises(ValueError): + st.start() + + +def test_complete_marks_outputs_and_job_completed() -> None: + st = SubTask.create(task_id=uuid4()) + st.start() + + st.complete({"uprn": "123"}) + + assert st.status is SubTaskStatus.COMPLETE + assert st.outputs == {"result": {"uprn": "123"}} + assert st.job_completed is not None + + +def test_complete_without_result_leaves_outputs_unset() -> None: + st = SubTask.create(task_id=uuid4()) + st.complete() + assert st.outputs is None + + +def test_fail_records_error_in_outputs() -> None: + st = SubTask.create(task_id=uuid4()) + err = RuntimeError("boom") + + st.fail(err) + + assert st.status is SubTaskStatus.FAILED + assert st.outputs == {"error": "boom"} + assert st.job_completed is not None diff --git a/tests/domain/tasks/test_tasks.py b/tests/domain/tasks/test_tasks.py new file mode 100644 index 00000000..f30c0aa1 --- /dev/null +++ b/tests/domain/tasks/test_tasks.py @@ -0,0 +1,104 @@ +import pytest + +from domain.tasks.subtasks import SubTaskStatus +from domain.tasks.tasks import Source, Task, TaskStatus + + +def test_create_task_starts_waiting() -> None: + # Arrange / Act + t = Task.create( + task_source="manual:test", source=Source.PORTFOLIO, source_id="abc-123" + ) + + # Assert + assert t.status is TaskStatus.WAITING + assert t.source is Source.PORTFOLIO + assert t.source_id == "abc-123" + assert t.job_started is not None + assert t.job_completed is None + + +def test_create_task_rejects_blank_task_source() -> None: + with pytest.raises(ValueError, match="task_source"): + Task.create(task_source=" ") + + +def test_start_transitions_to_in_progress() -> None: + t = Task.create(task_source="manual:test") + t.start() + assert t.status is TaskStatus.IN_PROGRESS + + +def test_complete_marks_job_completed() -> None: + t = Task.create(task_source="manual:test") + t.start() + t.complete() + assert t.status is TaskStatus.COMPLETE + assert t.job_completed is not None + + +def test_fail_marks_job_completed() -> None: + t = Task.create(task_source="manual:test") + t.fail() + assert t.status is TaskStatus.FAILED + assert t.job_completed is not None + + +def test_start_rejects_from_terminal_status() -> None: + t = Task.create(task_source="manual:test") + t.complete() + with pytest.raises(ValueError): + t.start() + + +def test_recalculate_with_empty_statuses_is_noop() -> None: + t = Task.create(task_source="manual:test") + original_status = t.status + original_completed = t.job_completed + + t.recalculate_from_subtasks([]) + + assert t.status is original_status + assert t.job_completed is original_completed + + +def test_recalculate_all_waiting_keeps_waiting() -> None: + t = Task.create(task_source="manual:test") + t.start() # task moved to IN_PROGRESS earlier + t.complete() # then COMPLETE, with job_completed set + + t.recalculate_from_subtasks([SubTaskStatus.WAITING, SubTaskStatus.WAITING]) + + assert t.status is TaskStatus.WAITING + assert t.job_completed is None + + +def test_recalculate_any_in_progress_marks_in_progress() -> None: + t = Task.create(task_source="manual:test") + + t.recalculate_from_subtasks( + [SubTaskStatus.WAITING, SubTaskStatus.IN_PROGRESS, SubTaskStatus.COMPLETE] + ) + + assert t.status is TaskStatus.IN_PROGRESS + assert t.job_completed is None + + +def test_recalculate_all_complete_marks_complete() -> None: + t = Task.create(task_source="manual:test") + + t.recalculate_from_subtasks([SubTaskStatus.COMPLETE, SubTaskStatus.COMPLETE]) + + assert t.status is TaskStatus.COMPLETE + assert t.job_completed is not None + + +def test_recalculate_any_failed_marks_failed_even_with_others() -> None: + t = Task.create(task_source="manual:test") + + t.recalculate_from_subtasks( + [SubTaskStatus.IN_PROGRESS, SubTaskStatus.COMPLETE, SubTaskStatus.FAILED] + ) + + assert t.status is TaskStatus.FAILED + assert t.job_completed is not None diff --git a/tests/orchestration/__init__.py b/tests/orchestration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/orchestration/test_task_orchestrator.py b/tests/orchestration/test_task_orchestrator.py new file mode 100644 index 00000000..1a48127f --- /dev/null +++ b/tests/orchestration/test_task_orchestrator.py @@ -0,0 +1,151 @@ +from collections.abc import Iterator +from dataclasses import dataclass + +import pytest +from sqlmodel import Session, SQLModel, create_engine + +from domain.tasks.subtasks import SubTask, SubTaskStatus +from domain.tasks.tasks import Source, TaskStatus +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository +from repositories.tasks.task_postgres_repository import TaskPostgresRepository + + +@dataclass +class Harness: + orchestrator: TaskOrchestrator + tasks: TaskPostgresRepository + subtasks: SubTaskPostgresRepository + + +@pytest.fixture +def harness() -> Iterator[Harness]: + engine = create_engine("sqlite://") + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + tasks = TaskPostgresRepository(session=session) + subtasks = SubTaskPostgresRepository(session=session) + yield Harness( + orchestrator=TaskOrchestrator(task_repo=tasks, subtask_repo=subtasks), + tasks=tasks, + subtasks=subtasks, + ) + + +def test_create_task_with_subtask_creates_both_in_waiting( + harness: Harness, +) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test", + inputs={"foo": "bar"}, + source=Source.PORTFOLIO, + source_id="abc", + ) + + assert task.status is TaskStatus.WAITING + assert subtask.status is SubTaskStatus.WAITING + assert subtask.task_id == task.id + assert subtask.inputs == {"foo": "bar"} + + +def test_start_subtask_cascades_to_in_progress(harness: Harness) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + + started = harness.orchestrator.start_subtask( + subtask.id, cloud_logs_url="https://example/log" + ) + + assert started.status is SubTaskStatus.IN_PROGRESS + assert started.cloud_logs_url == "https://example/log" + assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS + + +def test_complete_subtask_cascades_to_complete(harness: Harness) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + harness.orchestrator.start_subtask(subtask.id) + + harness.orchestrator.complete_subtask(subtask.id, {"value": 42}) + + done_subtask = harness.subtasks.get(subtask.id) + done_task = harness.tasks.get(task.id) + assert done_subtask.outputs == {"result": {"value": 42}} + assert done_task.status is TaskStatus.COMPLETE + assert done_task.job_completed is not None + + +def test_fail_subtask_cascades_to_failed(harness: Harness) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + + harness.orchestrator.fail_subtask(subtask.id, RuntimeError("boom")) + + failed_subtask = harness.subtasks.get(subtask.id) + failed_task = harness.tasks.get(task.id) + assert failed_subtask.outputs == {"error": "boom"} + assert failed_task.status is TaskStatus.FAILED + + +def test_failed_subtask_locks_task_failed_even_with_others_complete( + harness: Harness, +) -> None: + task, first = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + second = SubTask.create(task_id=task.id) + harness.subtasks.create(second) + + harness.orchestrator.complete_subtask(first.id) + harness.orchestrator.fail_subtask(second.id, RuntimeError("nope")) + + assert harness.tasks.get(task.id).status is TaskStatus.FAILED + + +def test_mixed_complete_and_in_progress_keeps_task_in_progress( + harness: Harness, +) -> None: + task, first = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + second = SubTask.create(task_id=task.id) + harness.subtasks.create(second) + + harness.orchestrator.complete_subtask(first.id) + harness.orchestrator.start_subtask(second.id) + + assert harness.tasks.get(task.id).status is TaskStatus.IN_PROGRESS + + +def test_run_subtask_happy_path_returns_result_and_cascades_complete( + harness: Harness, +) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + + result = harness.orchestrator.run_subtask(subtask.id, work=lambda: {"answer": 42}) + + assert result == {"answer": 42} + assert harness.subtasks.get(subtask.id).status is SubTaskStatus.COMPLETE + assert harness.tasks.get(task.id).status is TaskStatus.COMPLETE + + +def test_run_subtask_failing_work_marks_failed_and_reraises( + harness: Harness, +) -> None: + task, subtask = harness.orchestrator.create_task_with_subtask( + task_source="manual:test" + ) + + def boom() -> None: + raise RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + harness.orchestrator.run_subtask(subtask.id, work=boom) + + assert harness.subtasks.get(subtask.id).status is SubTaskStatus.FAILED + assert harness.tasks.get(task.id).status is TaskStatus.FAILED diff --git a/tests/repositories/__init__.py b/tests/repositories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repositories/tasks/__init__.py b/tests/repositories/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repositories/tasks/postgres/__init__.py b/tests/repositories/tasks/postgres/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repositories/tasks/postgres/test_subtask_postgres_repository.py b/tests/repositories/tasks/postgres/test_subtask_postgres_repository.py new file mode 100644 index 00000000..ac39e089 --- /dev/null +++ b/tests/repositories/tasks/postgres/test_subtask_postgres_repository.py @@ -0,0 +1,81 @@ +from collections.abc import Iterator +from uuid import uuid4 + +import pytest +from sqlmodel import Session, SQLModel, create_engine + +# Importing the SQLModel row modules registers their tables in +# SQLModel.metadata so create_all builds both. Imports look unused; they aren't. +import infrastructure.postgres.subtask_table # noqa: F401 # pyright: ignore[reportUnusedImport] +import infrastructure.postgres.task_table # noqa: F401 # pyright: ignore[reportUnusedImport] +from domain.tasks.subtasks import SubTask, SubTaskStatus +from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository + + +@pytest.fixture +def session() -> Iterator[Session]: + engine = create_engine("sqlite://") + SQLModel.metadata.create_all(engine) + with Session(engine) as s: + yield s + + +def test_create_and_get_round_trip_preserves_inputs(session: Session) -> None: + repo = SubTaskPostgresRepository(session=session) + task_id = uuid4() + st = SubTask.create(task_id=task_id, inputs={"address": "68 Glendon Way"}) + + repo.create(st) + fetched = repo.get(st.id) + + assert fetched.id == st.id + assert fetched.task_id == task_id + assert fetched.status is SubTaskStatus.WAITING + assert fetched.inputs == {"address": "68 Glendon Way"} + assert fetched.outputs is None + + +def test_save_persists_status_and_outputs(session: Session) -> None: + repo = SubTaskPostgresRepository(session=session) + st = SubTask.create(task_id=uuid4()) + repo.create(st) + + st.start(cloud_logs_url="https://example/log") + repo.save(st) + assert repo.get(st.id).status is SubTaskStatus.IN_PROGRESS + + st.complete({"uprn": "123"}) + repo.save(st) + done = repo.get(st.id) + assert done.status is SubTaskStatus.COMPLETE + assert done.outputs == {"result": {"uprn": "123"}} + assert done.cloud_logs_url == "https://example/log" + assert done.job_completed is not None + + +def test_list_by_task_filters_by_task_id(session: Session) -> None: + repo = SubTaskPostgresRepository(session=session) + task_a = uuid4() + task_b = uuid4() + repo.create(SubTask.create(task_id=task_a)) + repo.create(SubTask.create(task_id=task_a)) + repo.create(SubTask.create(task_id=task_b)) + + a_results = repo.list_by_task(task_a) + b_results = repo.list_by_task(task_b) + + assert len(a_results) == 2 + assert len(b_results) == 1 + assert all(s.task_id == task_a for s in a_results) + assert all(s.task_id == task_b for s in b_results) + + +def test_list_by_task_returns_empty_for_unknown_task(session: Session) -> None: + repo = SubTaskPostgresRepository(session=session) + assert repo.list_by_task(uuid4()) == [] + + +def test_get_missing_raises(session: Session) -> None: + repo = SubTaskPostgresRepository(session=session) + with pytest.raises(ValueError, match="not found"): + repo.get(uuid4()) diff --git a/tests/repositories/tasks/postgres/test_task_postgres_repository.py b/tests/repositories/tasks/postgres/test_task_postgres_repository.py new file mode 100644 index 00000000..3e1aa226 --- /dev/null +++ b/tests/repositories/tasks/postgres/test_task_postgres_repository.py @@ -0,0 +1,68 @@ +from collections.abc import Iterator +from uuid import uuid4 + +import pytest +from sqlmodel import Session, SQLModel, create_engine + +from domain.tasks.tasks import Source, Task, TaskStatus +from infrastructure.postgres.task_table import TaskRow +from repositories.tasks.task_postgres_repository import TaskPostgresRepository + + +@pytest.fixture +def session() -> Iterator[Session]: + engine = create_engine("sqlite://") + SQLModel.metadata.create_all(engine) + with Session(engine) as s: + yield s + + +def test_create_and_get_round_trip(session: Session) -> None: + # Arrange + repo = TaskPostgresRepository(session=session) + t = Task.create( + task_source="manual:test", source=Source.PORTFOLIO, source_id="abc-123" + ) + + # Act + repo.create(t) + fetched = repo.get(t.id) + + # Assert + assert fetched.id == t.id + assert fetched.status is TaskStatus.WAITING + assert fetched.source is Source.PORTFOLIO + assert fetched.source_id == "abc-123" + + +def test_save_persists_status_transition(session: Session) -> None: + repo = TaskPostgresRepository(session=session) + t = Task.create(task_source="manual:test") + repo.create(t) + + t.start() + repo.save(t) + assert repo.get(t.id).status is TaskStatus.IN_PROGRESS + + t.complete() + repo.save(t) + done = repo.get(t.id) + assert done.status is TaskStatus.COMPLETE + assert done.job_completed is not None + + +def test_get_missing_raises(session: Session) -> None: + repo = TaskPostgresRepository(session=session) + with pytest.raises(ValueError, match="not found"): + repo.get(uuid4()) + + +def test_get_normalises_legacy_capitalised_status(session: Session) -> None: + # Existing rows written by backend code use "In Progress" (capitalised). + repo = TaskPostgresRepository(session=session) + row = TaskRow(task_source="manual:test", status="In Progress") + session.add(row) + session.commit() + + fetched = repo.get(row.id) + assert fetched.status is TaskStatus.IN_PROGRESS diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utilities/aws_lambda/__init__.py b/utilities/aws_lambda/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utilities/aws_lambda/default_orchestrator.py b/utilities/aws_lambda/default_orchestrator.py new file mode 100644 index 00000000..f78886b9 --- /dev/null +++ b/utilities/aws_lambda/default_orchestrator.py @@ -0,0 +1,26 @@ +import os +from collections.abc import Generator +from contextlib import contextmanager + +from sqlmodel import Session + +from infrastructure.postgres.config import PostgresConfig +from infrastructure.postgres.engine import make_engine +from orchestration.task_orchestrator import TaskOrchestrator +from repositories.tasks.subtask_postgres_repository import SubTaskPostgresRepository +from repositories.tasks.task_postgres_repository import TaskPostgresRepository + + +@contextmanager +def default_orchestrator() -> Generator[TaskOrchestrator, None, None]: + """Yield a TaskOrchestrator wired to a fresh Postgres session. + + Connection params come from os.environ via PostgresConfig.from_env. Each + handler invocation gets its own session, cleaned up on context exit. + """ + engine = make_engine(PostgresConfig.from_env(dict(os.environ))) + with Session(engine) as session: + yield TaskOrchestrator( + task_repo=TaskPostgresRepository(session=session), + subtask_repo=SubTaskPostgresRepository(session=session), + ) diff --git a/utilities/aws_lambda/subtask_handler.py b/utilities/aws_lambda/subtask_handler.py new file mode 100644 index 00000000..64c1daa6 --- /dev/null +++ b/utilities/aws_lambda/subtask_handler.py @@ -0,0 +1,67 @@ +"""@subtask_handler decorator for Lambdas that operate on existing SubTasks. + +Translates an AWS Lambda invocation (SQS-shaped or direct) into +TaskOrchestrator.run_subtask(...) calls. +""" + +import json +from contextlib import AbstractContextManager +from functools import wraps +from typing import Any, Callable, Optional, cast + +from utilities.aws_lambda.default_orchestrator import default_orchestrator +from utilities.aws_lambda.subtask_trigger_body import SubtaskTriggerBody +from orchestration.task_orchestrator import TaskOrchestrator + +OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]] + + +def subtask_handler( + *, + orchestrator_cm: Optional[OrchestratorCM] = None, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Run the wrapped function as the body of an existing SubTask. + + For each record, validates the body via SubtaskTriggerBody (must contain + task_id and sub_task_id), then runs the function inside + orchestrator.run_subtask(...). The orchestrator owns the start/complete/ + fail lifecycle and cascades status into the parent Task. On failure the + underlying exception propagates after the SubTask is marked FAILED. + """ + factory = orchestrator_cm or default_orchestrator + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(event: dict[str, Any], context: Any) -> None: + with factory() as orchestrator: + for record in _records(event): + body = _parse_body(record) + trigger = SubtaskTriggerBody.model_validate(body) + orchestrator.run_subtask( + trigger.sub_task_id, + work=lambda body=body: func(body, context), + ) + + return wrapper + + return decorator + + +def _parse_body(record: dict[str, Any]) -> dict[str, Any]: + raw = record.get("body", record) + if isinstance(raw, str): + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return {} + return cast(dict[str, Any], parsed) if isinstance(parsed, dict) else {} + if isinstance(raw, dict): + return cast(dict[str, Any], raw) + return {} + + +def _records(event: dict[str, Any]) -> list[dict[str, Any]]: + raw_records = event.get("Records") + if isinstance(raw_records, list): + return [r for r in cast(list[Any], raw_records) if isinstance(r, dict)] + return [event] diff --git a/utilities/aws_lambda/subtask_trigger_body.py b/utilities/aws_lambda/subtask_trigger_body.py new file mode 100644 index 00000000..a6b539e5 --- /dev/null +++ b/utilities/aws_lambda/subtask_trigger_body.py @@ -0,0 +1,17 @@ +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class SubtaskTriggerBody(BaseModel): + """The minimum the subtask_handler needs to dispatch lifecycle calls. + + `extra="allow"` so the rest of the work payload passes through to the + decorated function untouched — handlers do their own model_validate on + the full body for fields specific to their use case. + """ + + model_config = ConfigDict(extra="allow") + + task_id: UUID + sub_task_id: UUID diff --git a/utilities/aws_lambda/task_handler.py b/utilities/aws_lambda/task_handler.py new file mode 100644 index 00000000..82c7198e --- /dev/null +++ b/utilities/aws_lambda/task_handler.py @@ -0,0 +1,98 @@ +"""@task_handler decorator for Lambdas that own the entire pipeline. + +Translates an AWS Lambda invocation (SQS-shaped or direct) into +TaskOrchestrator.create_task_with_subtask(...) + run_subtask(...). +""" + +import json +from contextlib import AbstractContextManager +from functools import wraps +from typing import Any, Callable, Optional, cast + +from utilities.aws_lambda.default_orchestrator import default_orchestrator +from domain.tasks.tasks import Source +from orchestration.task_orchestrator import TaskOrchestrator + +OrchestratorCM = Callable[[], AbstractContextManager[TaskOrchestrator]] + + +def task_handler( + *, + task_source: str, + source: Source, + orchestrator_cm: Optional[OrchestratorCM] = None, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Run the wrapped function as the body of a freshly-created Task + SubTask. + + For each record, creates a new Task + initial SubTask, then runs the + wrapped function inside orchestrator.run_subtask(...). `source_id` is + read from body[source.value] (silent None if absent — preserved from + legacy ADR-0001). + + Records-style events use SQS partial-batch-failure semantics: individual + failures are reported via {"batchItemFailures": [...]} rather than + propagating. Direct invocations re-raise. + """ + factory = orchestrator_cm or default_orchestrator + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(event: dict[str, Any], context: Any) -> Any: + with factory() as orchestrator: + results: list[Any] = [] + failures: list[dict[str, Any]] = [] + + for record in _records(event): + body = _parse_body(record) + raw_source_id = body.get(source.value) + source_id = ( + str(raw_source_id) if raw_source_id is not None else None + ) + + _, subtask = orchestrator.create_task_with_subtask( + task_source=task_source, + inputs=body, + source=source, + source_id=source_id, + ) + + try: + result = orchestrator.run_subtask( + subtask.id, + work=lambda body=body: func(body, context), + ) + results.append(result) + except Exception: + if "Records" in event: + message_id = record.get("messageId", "") + failures.append({"itemIdentifier": message_id}) + else: + raise + + if "Records" in event: + return {"batchItemFailures": failures} + return results + + return wrapper + + return decorator + + +def _parse_body(record: dict[str, Any]) -> dict[str, Any]: + raw = record.get("body", record) + if isinstance(raw, str): + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return {} + return cast(dict[str, Any], parsed) if isinstance(parsed, dict) else {} + if isinstance(raw, dict): + return cast(dict[str, Any], raw) + return {} + + +def _records(event: dict[str, Any]) -> list[dict[str, Any]]: + raw_records = event.get("Records") + if isinstance(raw_records, list): + return [r for r in cast(list[Any], raw_records) if isinstance(r, dict)] + return [event] diff --git a/utilities/private.py b/utilities/private.py new file mode 100644 index 00000000..77a70578 --- /dev/null +++ b/utilities/private.py @@ -0,0 +1,33 @@ +import inspect +from typing import Any, Callable + + +class private: + """Decorator that raises if a _-prefixed method is called from outside its class.""" + + func: Callable[..., Any] + name: str + owner: type + + def __init__(self, func: Callable[..., Any]) -> None: + self.func = func + self.name = getattr(func, "__name__", "") + + def __set_name__(self, owner: type, name: str) -> None: + self.owner = owner + + def __get__(self, instance: Any, owner: type) -> Callable[..., Any]: + # Walk up one frame to see who's calling + frame = inspect.currentframe() + if frame is None or frame.f_back is None: + raise RuntimeError("cannot inspect caller frame") + caller_frame = frame.f_back + caller_self = caller_frame.f_locals.get("self") + + if not isinstance(caller_self, self.owner): + raise RuntimeError( + f"{self.owner.__name__}.{self.name} is private; " + f"called from {caller_frame.f_code.co_name}" + ) + + return getattr(self.func, "__get__")(instance, owner)