fixed minor bug with epcsearcher and increase concurrency to 10

This commit is contained in:
Khalim Conn-Kowlessar 2026-01-04 13:29:41 +08:00
parent a5700b685d
commit e072d40fa8
3 changed files with 118 additions and 3 deletions

View file

@ -540,7 +540,7 @@ class SearchEpc:
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
uprns = {str(r["uprn"]) for r in rows if r["uprn"]}
# We can sometimes have no uprn for a property
if (len(uprns) == 0) and len(rows) > 0:
logger.warning("Found data but missing uprn")
@ -569,7 +569,8 @@ class SearchEpc:
f"Provided UPRN {self.uprn} does not match EPC UPRN {epc_uprn}, using provided UPRN"
)
# We overwrite but in this instance, we've likely got the wrong EPC data
newest_epc["uprn"] = self.uprn
# Insert as a string - same format as the raw data
newest_epc["uprn"] = str(self.uprn)
if self.fast:
return newest_epc, [], {}, "", "", ""

View file

@ -46,3 +46,117 @@ missed_properties.to_excel(
sheet_name="Standardised Asset List",
index=False
)
# Fixing an error - triggered jobs without removing EWI/IWI so need to delete all plans associated to these scenarios:
scenario_id = None
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from backend.app.db.models.recommendations import Plan
def count_plans_for_scenario(session: Session, scenario_id: int) -> int:
return session.execute(
select(func.count())
.select_from(Plan)
.where(Plan.scenario_id == scenario_id)
).scalar_one()
with db_session() as session:
n_plans = count_plans_for_scenario(session, scenario_id)
def get_plan_ids_for_scenario(session: Session, scenario_id: int) -> list[int]:
result = session.execute(
select(Plan.id)
.where(Plan.scenario_id == scenario_id)
)
return [row.id for row in result]
with db_session() as session:
plan_ids = get_plan_ids_for_scenario(session, scenario_id)
from sqlalchemy import text
from sqlalchemy.orm import Session
def chunked(iterable, size):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
from sqlalchemy import text
from sqlalchemy.orm import Session
def delete_plan_batch(session: Session, plan_ids: list[int]):
if not plan_ids:
return
session.execute(text("SET LOCAL lock_timeout = '5s'"))
params = {"plan_ids": plan_ids}
# ----------------------------
# recommendation_materials
# ----------------------------
session.execute(
text("""
DELETE FROM recommendation_materials rm
USING plan_recommendations pr
WHERE rm.recommendation_id = pr.recommendation_id
AND pr.plan_id = ANY(:plan_ids)
"""),
params,
)
# ----------------------------
# plan_recommendations
# ----------------------------
session.execute(
text("""
DELETE FROM plan_recommendations
WHERE plan_id = ANY(:plan_ids)
"""),
params,
)
# ----------------------------
# recommendations (only those used by these plans)
# ----------------------------
session.execute(
text("""
DELETE FROM recommendation r
WHERE r.id IN (
SELECT DISTINCT recommendation_id
FROM plan_recommendations
WHERE plan_id = ANY(:plan_ids)
)
"""),
params,
)
# ----------------------------
# plans LAST
# ----------------------------
session.execute(
text("""
DELETE FROM plan
WHERE id = ANY(:plan_ids)
"""),
params,
)
batch_size = 25
total = (len(plan_ids) + batch_size - 1) // batch_size
for i, batch in enumerate(chunked(plan_ids, batch_size), start=1):
print(f"Deleting plan batch {i}/{total} ({len(batch)} plans)")
with db_session() as session:
delete_plan_batch(session, batch)
print(f"Batch {i} committed")

View file

@ -66,7 +66,7 @@ functions:
- sqs:
arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue
batchSize: 1
maximumConcurrency: 7 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits
maximumConcurrency: 10 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits
resources: