mirror of
https://github.com/Hestia-Homes/survey-extraction.git
synced 2026-06-08 11:17:29 +00:00
380 lines
No EOL
13 KiB
Python
380 lines
No EOL
13 KiB
Python
import pandas as pd
|
|
import json
|
|
from pprint import pprint
|
|
import os
|
|
import copy
|
|
from collections import defaultdict
|
|
from typing import List, Dict, Any, Union, Optional
|
|
import boto3
|
|
from urllib.parse import urlparse
|
|
from decent_homes_pilot import decent_homes_calc
|
|
import uuid
|
|
from datetime import datetime, timezone, time, date
|
|
from decimal import Decimal
|
|
from sqlmodel import select
|
|
from sqlalchemy import update
|
|
from etl.models.topLevel import uploaded_files, ReportType
|
|
from etl.db.db import get_db_session, init_db
|
|
|
|
|
|
def process_complex(sheet_name, group_key="ADDRESS"):
|
|
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
|
|
|
|
element_cols = [
|
|
"ELEMENT GROUP", "ELEMENT CODE", "ELEMENT CODE DESCRIPTION",
|
|
"ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION",
|
|
"ELEMENT DATE VALUE", "ELEMENT NUMERIC VALUE",
|
|
"ELEMENT TEXT VALUE", "QUANTITY",
|
|
"INSTALL DATE", "REMAINING LIFE", "ELEMENT COMMENTS"
|
|
]
|
|
|
|
property_cols = [
|
|
"PROP REF", "ADDRESS", "OWNERSHIP",
|
|
"PROP STATUS", "PROP TYPE", "PROP SUB TYPE"
|
|
]
|
|
|
|
# Prepare output
|
|
records = []
|
|
|
|
# Loop through unique values in group_key (ADDRESS or BLOCK_CODE)
|
|
for val in df[group_key].unique():
|
|
g = df[df[group_key] == val] # subset
|
|
|
|
property_info = g[property_cols].drop_duplicates().iloc[0].to_dict()
|
|
|
|
# build elements dict keyed by ELEMENT CODE DESCRIPTION
|
|
elements_dict = {}
|
|
for _, row in g[element_cols].drop_duplicates().iterrows():
|
|
key = row["ELEMENT CODE DESCRIPTION"] # could also use "ELEMENT CODE"
|
|
elements_dict[key] = row.to_dict()
|
|
|
|
records.append({
|
|
group_key: val,
|
|
"property_info": property_info,
|
|
"elements": elements_dict
|
|
})
|
|
|
|
return records
|
|
|
|
def process_simple(sheet_name):
|
|
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
|
|
|
|
records = []
|
|
|
|
for address in df["Address"].unique():
|
|
g = df[df["Address"] == address].drop_duplicates() # subset for that address
|
|
row = g.iloc[0] # take first row if multiple
|
|
|
|
# build dict of all columns except Address
|
|
elements_dict = row.drop(labels=["Address"]).to_dict()
|
|
|
|
records.append({
|
|
"ADDRESS": address,
|
|
"to_add": elements_dict
|
|
})
|
|
|
|
return records
|
|
|
|
|
|
def combine_records_by_address(
|
|
asset_records: List[Dict[str, Any]],
|
|
simple_records: List[Dict[str, Any]],
|
|
dest_key: str = "to_add",
|
|
unique_identifier="Address"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Merge process_house_asset_data() and process_simple() results by ADDRESS.
|
|
All columns from simple_records['to_add'] will be merged under dest_key.
|
|
"""
|
|
# Index inputs by ADDRESS
|
|
asset_by_addr = {r["ADDRESS"]: r for r in asset_records}
|
|
simple_by_addr = {r["ADDRESS"]: r for r in simple_records}
|
|
|
|
merged: List[Dict[str, Any]] = []
|
|
|
|
# Use union of addresses from both sources
|
|
all_addresses = set(asset_by_addr) | set(simple_by_addr)
|
|
|
|
for addr in sorted(all_addresses):
|
|
base = copy.deepcopy(asset_by_addr.get(addr, {"ADDRESS": addr}))
|
|
simple = simple_by_addr.get(addr)
|
|
|
|
if simple:
|
|
base[dest_key] = simple.get("to_add", {})
|
|
|
|
merged.append(base)
|
|
|
|
return merged
|
|
|
|
def combine_records_for_flats(assets: dict, simple: list) -> dict:
|
|
"""Attach BLOCK_INFO (from simple[0]) to each asset in assets."""
|
|
if not simple or not isinstance(simple[0], dict):
|
|
return assets # nothing to add
|
|
|
|
block_info = simple[0]
|
|
|
|
for record in assets:
|
|
# Make sure record is a dict
|
|
# record.update({"BLOCK_INFO": block_info})
|
|
for ele_desc in block_info["elements"]:
|
|
if ele_desc not in record["elements"]:
|
|
record["elements"].update({ele_desc:block_info["elements"][ele_desc]})
|
|
|
|
return assets
|
|
|
|
def _json_default(o):
|
|
# datetimes → ISO 8601 strings
|
|
if isinstance(o, (datetime, date, time)):
|
|
return o.isoformat()
|
|
# decimals → float (or str if you need exactness)
|
|
if isinstance(o, Decimal):
|
|
return float(o)
|
|
# sets → lists
|
|
if isinstance(o, set):
|
|
return list(o)
|
|
# numpy/pandas types (optional)
|
|
try:
|
|
import numpy as np
|
|
import pandas as pd
|
|
if isinstance(o, (np.integer,)):
|
|
return int(o)
|
|
if isinstance(o, (np.floating,)):
|
|
return float(o)
|
|
if isinstance(o, (np.ndarray,)):
|
|
return o.tolist()
|
|
if isinstance(o, (pd.Timestamp,)):
|
|
return o.isoformat()
|
|
except Exception:
|
|
pass
|
|
# last resort: string
|
|
return str(o)
|
|
|
|
def uprn_to_address():
|
|
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ")
|
|
mapping = df.set_index('Address')['UPRN'].to_dict()
|
|
return mapping
|
|
|
|
def stories_to_address():
|
|
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ")
|
|
mapping = df.set_index('Address')['Storeys'].to_dict()
|
|
return mapping
|
|
|
|
def parse_s3_uri(uri: str):
|
|
"""
|
|
Parse an S3 URI or HTTPS S3 URL into bucket and key.
|
|
Supports formats:
|
|
- s3://bucket-name/path/to/file
|
|
- https://bucket-name.s3.region.amazonaws.com/path/to/file
|
|
"""
|
|
parsed = urlparse(uri)
|
|
|
|
if parsed.scheme == "s3":
|
|
# s3://bucket/key
|
|
bucket = parsed.netloc
|
|
key = parsed.path.lstrip("/")
|
|
elif parsed.scheme in ("http", "https"):
|
|
# https://bucket-name.s3.region.amazonaws.com/key
|
|
host_parts = parsed.netloc.split(".")
|
|
if len(host_parts) >= 3 and host_parts[1] == "s3":
|
|
bucket = host_parts[0]
|
|
else:
|
|
raise ValueError("Not a valid S3 HTTPS URL format")
|
|
key = parsed.path.lstrip("/")
|
|
else:
|
|
raise ValueError("Unsupported URI scheme")
|
|
|
|
return bucket, key
|
|
|
|
def upload_json_to_s3(json_obj, dest_uri: str, location="decent_homes/raw_data") -> str:
|
|
"""
|
|
Upload a JSON-serializable object to S3 at the given s3:// or https S3 URL.
|
|
Returns the public-style HTTPS S3 URL (still private if bucket is private).
|
|
"""
|
|
bucket, pdf_key = parse_s3_uri(dest_uri)
|
|
base_folder = os.path.dirname(pdf_key) # e.g. ".../report"
|
|
|
|
# Build jsonified folder + timestamp filename
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
json_key = f"{base_folder}/{location}/jsonified/{timestamp}.json"
|
|
|
|
# Same region/creds you used for download
|
|
aws_access_key = "AKIAU5A36PPNJMZZ3KRW"
|
|
aws_secret_key = "Pr5uxwh1zOCocKuFDA4DWQX039t0h2mnM7kaxlSt"
|
|
aws_region = "eu-west-2"
|
|
|
|
s3 = boto3.client(
|
|
"s3",
|
|
aws_access_key_id=aws_access_key,
|
|
aws_secret_access_key=aws_secret_key,
|
|
region_name=aws_region
|
|
)
|
|
|
|
body = json.dumps(json_obj, ensure_ascii=False, indent=2, default=_json_default).encode("utf-8")
|
|
|
|
s3.put_object(
|
|
Bucket=bucket,
|
|
Key=json_key,
|
|
Body=body,
|
|
ContentType="application/json"
|
|
# Optional hardening:
|
|
# , ServerSideEncryption="AES256"
|
|
)
|
|
|
|
# Return an HTTPS-style S3 URL (matches your input style)
|
|
return f"https://{bucket}.s3.{aws_region}.amazonaws.com/{json_key}"
|
|
|
|
|
|
|
|
def generate_file_uri(UPRN):
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/"
|
|
return file_uri
|
|
|
|
def create_or_update_uploaded_file_entry(
|
|
db_session,
|
|
uprn: str,
|
|
doc_type: ReportType,
|
|
json_uri: str,
|
|
s3_file_uri: str
|
|
):
|
|
"""
|
|
Create or update an entry in uploaded_files.
|
|
- If a record with the same (uprn, doc_type) exists, update it.
|
|
- Otherwise, insert a new record.
|
|
Commits, refreshes, and returns the ORM object.
|
|
"""
|
|
existing = (
|
|
db_session.query(uploaded_files)
|
|
.filter(uploaded_files.uprn == str(uprn), uploaded_files.doc_type == doc_type)
|
|
.one_or_none()
|
|
)
|
|
|
|
if existing:
|
|
# Update existing record
|
|
existing.s3_json_uri = json_uri
|
|
existing.s3_json_upload_timestamp = datetime.now(timezone.utc)
|
|
existing.s3_file_uri = s3_file_uri
|
|
obj = existing
|
|
else:
|
|
# Insert new record
|
|
obj = uploaded_files(
|
|
doc_type=doc_type,
|
|
s3_json_uri=json_uri,
|
|
s3_json_upload_timestamp=datetime.now(timezone.utc),
|
|
s3_file_uri=s3_file_uri,
|
|
uprn=str(uprn),
|
|
)
|
|
db_session.add(obj)
|
|
|
|
db_session.commit()
|
|
db_session.refresh(obj)
|
|
return obj
|
|
|
|
def handler(event, context):
|
|
|
|
uprn_mapping = uprn_to_address()
|
|
flats_to_stories = stories_to_address()
|
|
|
|
# read data for houses only
|
|
assets = process_complex("Houses Asset Data")
|
|
simple = process_simple("Houses")
|
|
houses = combine_records_by_address(assets, simple, dest_key="EPC_DATA")
|
|
|
|
for house in houses:
|
|
pseudo_name = house["ADDRESS"].split(",")[0]
|
|
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
|
|
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
|
|
|
|
#upload to s3
|
|
|
|
|
|
for i,house in enumerate(houses):
|
|
uprn = house["UPRN"]
|
|
print(uprn)
|
|
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]), location="decent_homes/raw_data")
|
|
|
|
# Save JSON locally
|
|
filename = f"{uprn}.json"
|
|
filepath = os.path.join("output", filename) # saves inside an "output" folder
|
|
os.makedirs("output", exist_ok=True) # make sure folder exists
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default)
|
|
|
|
property_decent_home, decent_home_meta = decent_homes_calc(filepath)
|
|
|
|
json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home")
|
|
with get_db_session() as session:
|
|
create_or_update_uploaded_file_entry(
|
|
db_session=session,
|
|
uprn=uprn,
|
|
doc_type=ReportType.DECENT_HOMES_SUMMARY,
|
|
json_uri=json_uri_1,
|
|
s3_file_uri=json_uri,
|
|
)
|
|
json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta")
|
|
with get_db_session() as session:
|
|
create_or_update_uploaded_file_entry(
|
|
db_session=session,
|
|
uprn=uprn,
|
|
doc_type=ReportType.DECENT_HOMES_PROPERTY_META,
|
|
json_uri=json_uri_1,
|
|
s3_file_uri=json_uri,
|
|
)
|
|
|
|
# read data for flats
|
|
assets = process_complex("Chingford Rd 236-256 Properties")
|
|
simple = process_complex("CHINGFORD ROAD 236-254 Asset Bl", "BLOCK_CODE")
|
|
flats = combine_records_for_flats(assets, simple)
|
|
|
|
for house in flats:
|
|
pseudo_name = house["ADDRESS"].split(",")[0]
|
|
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
|
|
print(uprn_mapping[pseudo_name.upper()])
|
|
|
|
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
|
|
house["property_info"].update({"FLAT LEVEL": flats_to_stories[pseudo_name.upper()]})
|
|
|
|
|
|
|
|
for i,house in enumerate(flats):
|
|
uprn = house["UPRN"]
|
|
print(uprn)
|
|
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]))
|
|
|
|
# Save JSON locally
|
|
filename = f"{house['UPRN']}.json"
|
|
filepath = os.path.join("output", filename) # saves inside an "output" folder
|
|
os.makedirs("output", exist_ok=True) # make sure folder exists
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default)
|
|
|
|
property_decent_home, decent_home_meta = decent_homes_calc(filepath)
|
|
|
|
json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home")
|
|
with get_db_session() as session:
|
|
create_or_update_uploaded_file_entry(
|
|
db_session=session,
|
|
uprn=uprn,
|
|
doc_type=ReportType.DECENT_HOMES_SUMMARY,
|
|
json_uri=json_uri_1,
|
|
s3_file_uri=json_uri,
|
|
)
|
|
json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta")
|
|
with get_db_session() as session:
|
|
create_or_update_uploaded_file_entry(
|
|
db_session=session,
|
|
uprn=uprn,
|
|
doc_type=ReportType.DECENT_HOMES_PROPERTY_META,
|
|
json_uri=json_uri_1,
|
|
s3_file_uri=json_uri,
|
|
)
|
|
|
|
# Keep track of saved file path
|
|
|
|
|
|
|
|
# To Do:
|
|
# [Jun-te] Spec of quesation that we have for waltham forest
|
|
# [Khalim] A document that has our mapping and our understanding of our data |