survey-extraction/deployment/lambda/walthamforest_etl/docker/app.py

380 lines
No EOL
13 KiB
Python

import pandas as pd
import json
from pprint import pprint
import os
import copy
from collections import defaultdict
from typing import List, Dict, Any, Union, Optional
import boto3
from urllib.parse import urlparse
from decent_homes_pilot import decent_homes_calc
import uuid
from datetime import datetime, timezone, time, date
from decimal import Decimal
from sqlmodel import select
from sqlalchemy import update
from etl.models.topLevel import uploaded_files, ReportType
from etl.db.db import get_db_session, init_db
def process_complex(sheet_name, group_key="ADDRESS"):
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
element_cols = [
"ELEMENT GROUP", "ELEMENT CODE", "ELEMENT CODE DESCRIPTION",
"ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION",
"ELEMENT DATE VALUE", "ELEMENT NUMERIC VALUE",
"ELEMENT TEXT VALUE", "QUANTITY",
"INSTALL DATE", "REMAINING LIFE", "ELEMENT COMMENTS"
]
property_cols = [
"PROP REF", "ADDRESS", "OWNERSHIP",
"PROP STATUS", "PROP TYPE", "PROP SUB TYPE"
]
# Prepare output
records = []
# Loop through unique values in group_key (ADDRESS or BLOCK_CODE)
for val in df[group_key].unique():
g = df[df[group_key] == val] # subset
property_info = g[property_cols].drop_duplicates().iloc[0].to_dict()
# build elements dict keyed by ELEMENT CODE DESCRIPTION
elements_dict = {}
for _, row in g[element_cols].drop_duplicates().iterrows():
key = row["ELEMENT CODE DESCRIPTION"] # could also use "ELEMENT CODE"
elements_dict[key] = row.to_dict()
records.append({
group_key: val,
"property_info": property_info,
"elements": elements_dict
})
return records
def process_simple(sheet_name):
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
records = []
for address in df["Address"].unique():
g = df[df["Address"] == address].drop_duplicates() # subset for that address
row = g.iloc[0] # take first row if multiple
# build dict of all columns except Address
elements_dict = row.drop(labels=["Address"]).to_dict()
records.append({
"ADDRESS": address,
"to_add": elements_dict
})
return records
def combine_records_by_address(
asset_records: List[Dict[str, Any]],
simple_records: List[Dict[str, Any]],
dest_key: str = "to_add",
unique_identifier="Address"
) -> List[Dict[str, Any]]:
"""
Merge process_house_asset_data() and process_simple() results by ADDRESS.
All columns from simple_records['to_add'] will be merged under dest_key.
"""
# Index inputs by ADDRESS
asset_by_addr = {r["ADDRESS"]: r for r in asset_records}
simple_by_addr = {r["ADDRESS"]: r for r in simple_records}
merged: List[Dict[str, Any]] = []
# Use union of addresses from both sources
all_addresses = set(asset_by_addr) | set(simple_by_addr)
for addr in sorted(all_addresses):
base = copy.deepcopy(asset_by_addr.get(addr, {"ADDRESS": addr}))
simple = simple_by_addr.get(addr)
if simple:
base[dest_key] = simple.get("to_add", {})
merged.append(base)
return merged
def combine_records_for_flats(assets: dict, simple: list) -> dict:
"""Attach BLOCK_INFO (from simple[0]) to each asset in assets."""
if not simple or not isinstance(simple[0], dict):
return assets # nothing to add
block_info = simple[0]
for record in assets:
# Make sure record is a dict
# record.update({"BLOCK_INFO": block_info})
for ele_desc in block_info["elements"]:
if ele_desc not in record["elements"]:
record["elements"].update({ele_desc:block_info["elements"][ele_desc]})
return assets
def _json_default(o):
# datetimes → ISO 8601 strings
if isinstance(o, (datetime, date, time)):
return o.isoformat()
# decimals → float (or str if you need exactness)
if isinstance(o, Decimal):
return float(o)
# sets → lists
if isinstance(o, set):
return list(o)
# numpy/pandas types (optional)
try:
import numpy as np
import pandas as pd
if isinstance(o, (np.integer,)):
return int(o)
if isinstance(o, (np.floating,)):
return float(o)
if isinstance(o, (np.ndarray,)):
return o.tolist()
if isinstance(o, (pd.Timestamp,)):
return o.isoformat()
except Exception:
pass
# last resort: string
return str(o)
def uprn_to_address():
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ")
mapping = df.set_index('Address')['UPRN'].to_dict()
return mapping
def stories_to_address():
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ")
mapping = df.set_index('Address')['Storeys'].to_dict()
return mapping
def parse_s3_uri(uri: str):
"""
Parse an S3 URI or HTTPS S3 URL into bucket and key.
Supports formats:
- s3://bucket-name/path/to/file
- https://bucket-name.s3.region.amazonaws.com/path/to/file
"""
parsed = urlparse(uri)
if parsed.scheme == "s3":
# s3://bucket/key
bucket = parsed.netloc
key = parsed.path.lstrip("/")
elif parsed.scheme in ("http", "https"):
# https://bucket-name.s3.region.amazonaws.com/key
host_parts = parsed.netloc.split(".")
if len(host_parts) >= 3 and host_parts[1] == "s3":
bucket = host_parts[0]
else:
raise ValueError("Not a valid S3 HTTPS URL format")
key = parsed.path.lstrip("/")
else:
raise ValueError("Unsupported URI scheme")
return bucket, key
def upload_json_to_s3(json_obj, dest_uri: str, location="decent_homes/raw_data") -> str:
"""
Upload a JSON-serializable object to S3 at the given s3:// or https S3 URL.
Returns the public-style HTTPS S3 URL (still private if bucket is private).
"""
bucket, pdf_key = parse_s3_uri(dest_uri)
base_folder = os.path.dirname(pdf_key) # e.g. ".../report"
# Build jsonified folder + timestamp filename
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
json_key = f"{base_folder}/{location}/jsonified/{timestamp}.json"
# Same region/creds you used for download
aws_access_key = "AKIAU5A36PPNJMZZ3KRW"
aws_secret_key = "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"
aws_region = "eu-west-2"
s3 = boto3.client(
"s3",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)
body = json.dumps(json_obj, ensure_ascii=False, indent=2, default=_json_default).encode("utf-8")
s3.put_object(
Bucket=bucket,
Key=json_key,
Body=body,
ContentType="application/json"
# Optional hardening:
# , ServerSideEncryption="AES256"
)
# Return an HTTPS-style S3 URL (matches your input style)
return f"https://{bucket}.s3.{aws_region}.amazonaws.com/{json_key}"
def generate_file_uri(UPRN):
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/"
return file_uri
def create_or_update_uploaded_file_entry(
db_session,
uprn: str,
doc_type: ReportType,
json_uri: str,
s3_file_uri: str
):
"""
Create or update an entry in uploaded_files.
- If a record with the same (uprn, doc_type) exists, update it.
- Otherwise, insert a new record.
Commits, refreshes, and returns the ORM object.
"""
existing = (
db_session.query(uploaded_files)
.filter(uploaded_files.uprn == str(uprn), uploaded_files.doc_type == doc_type)
.one_or_none()
)
if existing:
# Update existing record
existing.s3_json_uri = json_uri
existing.s3_json_upload_timestamp = datetime.now(timezone.utc)
existing.s3_file_uri = s3_file_uri
obj = existing
else:
# Insert new record
obj = uploaded_files(
doc_type=doc_type,
s3_json_uri=json_uri,
s3_json_upload_timestamp=datetime.now(timezone.utc),
s3_file_uri=s3_file_uri,
uprn=str(uprn),
)
db_session.add(obj)
db_session.commit()
db_session.refresh(obj)
return obj
def handler(event, context):
uprn_mapping = uprn_to_address()
flats_to_stories = stories_to_address()
# read data for houses only
assets = process_complex("Houses Asset Data")
simple = process_simple("Houses")
houses = combine_records_by_address(assets, simple, dest_key="EPC_DATA")
for house in houses:
pseudo_name = house["ADDRESS"].split(",")[0]
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
#upload to s3
for i,house in enumerate(houses):
uprn = house["UPRN"]
print(uprn)
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]), location="decent_homes/raw_data")
# Save JSON locally
filename = f"{uprn}.json"
filepath = os.path.join("output", filename) # saves inside an "output" folder
os.makedirs("output", exist_ok=True) # make sure folder exists
with open(filepath, "w", encoding="utf-8") as f:
json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default)
property_decent_home, decent_home_meta = decent_homes_calc(filepath)
json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home")
with get_db_session() as session:
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_SUMMARY,
json_uri=json_uri_1,
s3_file_uri=json_uri,
)
json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta")
with get_db_session() as session:
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_PROPERTY_META,
json_uri=json_uri_1,
s3_file_uri=json_uri,
)
# read data for flats
assets = process_complex("Chingford Rd 236-256 Properties")
simple = process_complex("CHINGFORD ROAD 236-254 Asset Bl", "BLOCK_CODE")
flats = combine_records_for_flats(assets, simple)
for house in flats:
pseudo_name = house["ADDRESS"].split(",")[0]
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
print(uprn_mapping[pseudo_name.upper()])
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
house["property_info"].update({"FLAT LEVEL": flats_to_stories[pseudo_name.upper()]})
for i,house in enumerate(flats):
uprn = house["UPRN"]
print(uprn)
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]))
# Save JSON locally
filename = f"{house['UPRN']}.json"
filepath = os.path.join("output", filename) # saves inside an "output" folder
os.makedirs("output", exist_ok=True) # make sure folder exists
with open(filepath, "w", encoding="utf-8") as f:
json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default)
property_decent_home, decent_home_meta = decent_homes_calc(filepath)
json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home")
with get_db_session() as session:
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_SUMMARY,
json_uri=json_uri_1,
s3_file_uri=json_uri,
)
json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta")
with get_db_session() as session:
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_PROPERTY_META,
json_uri=json_uri_1,
s3_file_uri=json_uri,
)
# Keep track of saved file path
# To Do:
# [Jun-te] Spec of quesation that we have for waltham forest
# [Khalim] A document that has our mapping and our understanding of our data