diff --git a/applications/modelling_e2e/handler.py b/applications/modelling_e2e/handler.py index 0f2c4fe6..c2d11073 100644 --- a/applications/modelling_e2e/handler.py +++ b/applications/modelling_e2e/handler.py @@ -95,6 +95,7 @@ from repositories.property.property_overrides_postgres_reader import ( from repositories.scenario.scenario_postgres_repository import ( ScenarioPostgresRepository, ) +from repositories.solar.solar_postgres_repository import SolarPostgresRepository from utilities.aws_lambda.task_handler import task_handler from utilities.logger import setup_logger @@ -289,6 +290,7 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: try: scenario = ScenarioPostgresRepository(read_session).get_many([scenario_id])[0] products = catalogue_with_off_catalogue_overrides(read_session) + solar_reader = SolarPostgresRepository(read_session) failures: list[dict[str, Any]] = [] @@ -356,9 +358,18 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: landlord_overrides=overrides, ).effective_epc - solar_insights: Optional[dict[str, Any]] = ( - None if no_solar else _solar_insights_for(solar_client, spatial) - ) + # Read-before-fetch: the Google Solar call is paid, so skip it + # when this UPRN's insights are already persisted. Only a cache + # miss hits Google — re-runs cost nothing for solar. + solar_insights: Optional[dict[str, Any]] + solar_was_fetched = False + if no_solar: + solar_insights = None + else: + solar_insights = solar_reader.get(uprn) + if solar_insights is None: + solar_insights = _solar_insights_for(solar_client, spatial) + solar_was_fetched = solar_insights is not None # All Measure Types are considered: the off-catalogue overlay # (catalogue_with_off_catalogue_overrides) prices the measures the @@ -395,7 +406,8 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: if spatial is not None: uow.spatial.save(uprn, spatial) if ( - solar_insights is not None + solar_was_fetched + and solar_insights is not None and spatial is not None and spatial.coordinates is not None ): @@ -453,22 +465,29 @@ def handler(body: dict[str, Any], context: Any) -> Optional[dict[str, Any]]: f"{[s['certificate_number'] for s in skipped_certs]}" ) - if failures: - failed_ids = [f["property_id"] for f in failures] - # Persisted verbatim into the subtask's outputs.error (via - # SubTask.fail), so include each property's error type + message — - # not just the IDs — to make failed runs diagnosable without - # cross-referencing CloudWatch. - message = ( - f"failed property_ids: {failed_ids}; " - f"details: {json.dumps(failures)}" - ) - if skipped_certs: - message += ( - f"; skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}" + # A property that errored AND a cohort cert the mapper could not consume + # are both surfaced as failures, so the subtask is marked failed and + # shows up for debugging. The whole batch has already run by this point — + # every property that could be modelled was written to DB above — so + # failing here flags the run without discarding the work done so far. + if failures or skipped_certs: + parts: list[str] = [] + if failures: + failed_ids = [f["property_id"] for f in failures] + # Persisted verbatim into the subtask's outputs.error (via + # SubTask.fail): include each property's error type + message, + # not just the IDs, so failed runs are diagnosable without + # cross-referencing CloudWatch. + parts.append( + f"failed property_ids: {failed_ids}; " + f"details: {json.dumps(failures)}" ) - raise RuntimeError(message) + if skipped_certs: + parts.append( + f"skipped_unmappable_cohort_certs: {json.dumps(skipped_certs)}" + ) + raise RuntimeError("; ".join(parts)) - return {"skipped_unmappable_cohort_certs": skipped_certs} if skipped_certs else None + return None finally: read_session.close() diff --git a/scripts/trigger_modelling_e2e_sqs.py b/scripts/trigger_modelling_e2e_sqs.py index f0eaed38..76105422 100644 --- a/scripts/trigger_modelling_e2e_sqs.py +++ b/scripts/trigger_modelling_e2e_sqs.py @@ -20,9 +20,10 @@ PORTFOLIO_ID: int = 796 SCENARIO_ID: int = 1268 SQS_QUEUE_NAME: str = "modelling_e2e-queue-dev" -# Number of postcodes to process this run (postcodes where all properties are -# already completed are skipped and do not count toward this limit). -POSTCODES_LIMIT: int = 1000 +# Max number of properties to process this run (cost cap). Postcodes are added +# whole, smallest-group-first, until adding the next would exceed this — so the +# actual count lands at or just under the limit. +PROPERTIES_LIMIT: int = 5000 # True → Lambda runs the full pipeline but skips all DB writes (safe for testing). DRY_RUN: bool = False @@ -94,21 +95,31 @@ def _completed_property_ids() -> set[int]: def main() -> None: postcode_map = _load_postcode_map() - completed = _completed_property_ids() - logger.info(f"{len(completed)} property IDs already completed — skipping") + # Pending filter disabled — re-run every property regardless of whether it + # already has a completed modelling_e2e sub_task for this scenario. batches: list[tuple[str, list[int]]] = [] for postcode, ids in postcode_map.items(): - pending = [pid for pid in ids if pid not in completed] - if pending: - batches.append((postcode, pending)) + if ids: + batches.append((postcode, ids)) - to_process = batches[:POSTCODES_LIMIT] + to_process: list[tuple[str, list[int]]] = [] + property_count = 0 + for postcode, ids in batches: + if property_count + len(ids) > PROPERTIES_LIMIT: + continue + to_process.append((postcode, ids)) + property_count += len(ids) if not to_process: logger.info("Nothing left to process.") return + logger.info( + f"selected {len(to_process)} postcodes / {property_count} properties " + f"(limit {PROPERTIES_LIMIT})" + ) + sqs: Any = cast( Any, boto3.client("sqs", region_name="eu-west-2") ) # pyright: ignore[reportUnknownMemberType]