mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-30 13:10:47 +00:00
Merge pull request #1350 from Hestia-Homes/modelling-e2e-12-concurrency
Decrease e2e modelling concurrency from 32 to 12
This commit is contained in:
commit
b426940136
3 changed files with 18 additions and 19 deletions
|
|
@ -231,7 +231,7 @@ def _get_engine() -> Engine:
|
|||
# everything up front through one short-lived read Session, closes it,
|
||||
# then writes each Property in a sequential Unit of Work — and the Unit of
|
||||
# Work resolves overrides on its own session — so no two Sessions overlap
|
||||
# and a single connection suffices. 32 concurrent containers × 1 = 32
|
||||
# and a single connection suffices. 12 concurrent containers × 1 = 12
|
||||
# against RDS.
|
||||
#
|
||||
# NullPool, not a fixed pool, enforces that as a *graceful* ceiling rather
|
||||
|
|
@ -408,7 +408,9 @@ def _predict_epc(
|
|||
orchestrator_cm=_shared_engine_orchestrator,
|
||||
pass_task_orchestrator=True,
|
||||
)
|
||||
def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, task_id: UUID) -> None:
|
||||
def handler(
|
||||
body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator, task_id: UUID
|
||||
) -> None:
|
||||
trigger = ModellingE2ETriggerBody.model_validate(body)
|
||||
property_ids = trigger.property_ids
|
||||
portfolio_id = trigger.portfolio_id
|
||||
|
|
@ -510,9 +512,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
)
|
||||
epc_repo = EpcPostgresRepository(read_session)
|
||||
stored_lodged_epcs: dict[int, EpcPropertyData] = (
|
||||
epc_repo.get_for_properties(property_ids)
|
||||
if not refetch_epc
|
||||
else {}
|
||||
epc_repo.get_for_properties(property_ids) if not refetch_epc else {}
|
||||
)
|
||||
stored_predicted_epcs: dict[int, EpcPropertyData] = (
|
||||
epc_repo.get_predicted_for_properties(property_ids)
|
||||
|
|
@ -535,9 +535,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
|
||||
spatial = _spatial_for(geospatial, uprn)
|
||||
restrictions = (
|
||||
spatial.restrictions
|
||||
if spatial is not None
|
||||
else PlanningRestrictions()
|
||||
spatial.restrictions if spatial is not None else PlanningRestrictions()
|
||||
)
|
||||
coordinates: Optional[Coordinates] = (
|
||||
spatial.coordinates if spatial is not None else None
|
||||
|
|
@ -547,10 +545,14 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
if refetch_epc:
|
||||
epc: Optional[EpcPropertyData] = epc_client.get_by_uprn(uprn)
|
||||
elif stored_lodged is not None:
|
||||
logger.info(f"property={pid} using stored lodged EPC (refetch_epc=False)")
|
||||
logger.info(
|
||||
f"property={pid} using stored lodged EPC (refetch_epc=False)"
|
||||
)
|
||||
epc = stored_lodged
|
||||
else:
|
||||
epc = None # no stored lodged EPC; prediction path handles this property
|
||||
epc = (
|
||||
None # no stored lodged EPC; prediction path handles this property
|
||||
)
|
||||
overrides = overlays_from(overrides_reader.overrides_for(pid))
|
||||
predicted_epc: Optional[EpcPropertyData] = None
|
||||
|
||||
|
|
@ -567,9 +569,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
landlord_overrides=overrides,
|
||||
).effective_epc
|
||||
else:
|
||||
logger.info(
|
||||
f"property={pid} no lodged EPC — attempting prediction"
|
||||
)
|
||||
logger.info(f"property={pid} no lodged EPC — attempting prediction")
|
||||
stored_predicted = stored_predicted_epcs.get(pid)
|
||||
if not repredict_epc and stored_predicted is not None:
|
||||
logger.info(
|
||||
|
|
@ -620,8 +620,7 @@ def handler(body: dict[str, Any], context: Any, orchestrator: TaskOrchestrator,
|
|||
print_table=False,
|
||||
)
|
||||
logger.info(
|
||||
f"property={pid} modelling complete "
|
||||
f"measures={len(plan.measures)}"
|
||||
f"property={pid} modelling complete " f"measures={len(plan.measures)}"
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ variable "reserved_concurrent_executions" {
|
|||
|
||||
variable "maximum_concurrency" {
|
||||
type = number
|
||||
default = 32
|
||||
default = 12
|
||||
description = "Maximum concurrent Lambda invocations from the SQS trigger."
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,15 +43,15 @@ COMPLETED_SINCE: datetime | None = datetime(
|
|||
DRY_RUN: bool = False
|
||||
|
||||
# False → Lambda skips the Google Solar fetch (re-uses stored Solar data).
|
||||
REFETCH_SOLAR: bool = True
|
||||
REFETCH_SOLAR: bool = False
|
||||
|
||||
# False → use stored lodged EPC for properties that have one; properties with no
|
||||
# stored lodged EPC are treated as EPC-less and routed to prediction (no API call).
|
||||
REFETCH_EPC: bool = True
|
||||
REFETCH_EPC: bool = False
|
||||
|
||||
# False → use stored predicted EPC for EPC-less properties that have one; live
|
||||
# prediction still runs when no stored predicted EPC exists for the property.
|
||||
REPREDICT_EPC: bool = True
|
||||
REPREDICT_EPC: bool = False
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue