import pandas as pd import json from pprint import pprint import os import copy from collections import defaultdict from typing import List, Dict, Any, Union, Optional import boto3 from urllib.parse import urlparse from decent_homes_pilot import decent_homes_calc import uuid from datetime import datetime, timezone, time, date from decimal import Decimal from sqlmodel import select from sqlalchemy import update from etl.models.topLevel import uploaded_files, ReportType from etl.db.db import get_db_session, init_db def process_complex(sheet_name, group_key="ADDRESS"): df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name) element_cols = [ "ELEMENT GROUP", "ELEMENT CODE", "ELEMENT CODE DESCRIPTION", "ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION", "ELEMENT DATE VALUE", "ELEMENT NUMERIC VALUE", "ELEMENT TEXT VALUE", "QUANTITY", "INSTALL DATE", "REMAINING LIFE", "ELEMENT COMMENTS" ] property_cols = [ "PROP REF", "ADDRESS", "OWNERSHIP", "PROP STATUS", "PROP TYPE", "PROP SUB TYPE" ] # Prepare output records = [] # Loop through unique values in group_key (ADDRESS or BLOCK_CODE) for val in df[group_key].unique(): g = df[df[group_key] == val] # subset property_info = g[property_cols].drop_duplicates().iloc[0].to_dict() # build elements dict keyed by ELEMENT CODE DESCRIPTION elements_dict = {} for _, row in g[element_cols].drop_duplicates().iterrows(): key = row["ELEMENT CODE DESCRIPTION"] # could also use "ELEMENT CODE" elements_dict[key] = row.to_dict() records.append({ group_key: val, "property_info": property_info, "elements": elements_dict }) return records def process_simple(sheet_name): df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name) records = [] for address in df["Address"].unique(): g = df[df["Address"] == address].drop_duplicates() # subset for that address row = g.iloc[0] # take first row if multiple # build dict of all columns except Address elements_dict = row.drop(labels=["Address"]).to_dict() records.append({ "ADDRESS": address, "to_add": elements_dict }) return records def combine_records_by_address( asset_records: List[Dict[str, Any]], simple_records: List[Dict[str, Any]], dest_key: str = "to_add", unique_identifier="Address" ) -> List[Dict[str, Any]]: """ Merge process_house_asset_data() and process_simple() results by ADDRESS. All columns from simple_records['to_add'] will be merged under dest_key. """ # Index inputs by ADDRESS asset_by_addr = {r["ADDRESS"]: r for r in asset_records} simple_by_addr = {r["ADDRESS"]: r for r in simple_records} merged: List[Dict[str, Any]] = [] # Use union of addresses from both sources all_addresses = set(asset_by_addr) | set(simple_by_addr) for addr in sorted(all_addresses): base = copy.deepcopy(asset_by_addr.get(addr, {"ADDRESS": addr})) simple = simple_by_addr.get(addr) if simple: base[dest_key] = simple.get("to_add", {}) merged.append(base) return merged def combine_records_for_flats(assets: dict, simple: list) -> dict: """Attach BLOCK_INFO (from simple[0]) to each asset in assets.""" if not simple or not isinstance(simple[0], dict): return assets # nothing to add block_info = simple[0] for record in assets: # Make sure record is a dict # record.update({"BLOCK_INFO": block_info}) for ele_desc in block_info["elements"]: if ele_desc not in record["elements"]: record["elements"].update({ele_desc:block_info["elements"][ele_desc]}) return assets def _json_default(o): # datetimes → ISO 8601 strings if isinstance(o, (datetime, date, time)): return o.isoformat() # decimals → float (or str if you need exactness) if isinstance(o, Decimal): return float(o) # sets → lists if isinstance(o, set): return list(o) # numpy/pandas types (optional) try: import numpy as np import pandas as pd if isinstance(o, (np.integer,)): return int(o) if isinstance(o, (np.floating,)): return float(o) if isinstance(o, (np.ndarray,)): return o.tolist() if isinstance(o, (pd.Timestamp,)): return o.isoformat() except Exception: pass # last resort: string return str(o) def uprn_to_address(): df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ") mapping = df.set_index('Address')['UPRN'].to_dict() return mapping def stories_to_address(): df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ") mapping = df.set_index('Address')['Storeys'].to_dict() return mapping def parse_s3_uri(uri: str): """ Parse an S3 URI or HTTPS S3 URL into bucket and key. Supports formats: - s3://bucket-name/path/to/file - https://bucket-name.s3.region.amazonaws.com/path/to/file """ parsed = urlparse(uri) if parsed.scheme == "s3": # s3://bucket/key bucket = parsed.netloc key = parsed.path.lstrip("/") elif parsed.scheme in ("http", "https"): # https://bucket-name.s3.region.amazonaws.com/key host_parts = parsed.netloc.split(".") if len(host_parts) >= 3 and host_parts[1] == "s3": bucket = host_parts[0] else: raise ValueError("Not a valid S3 HTTPS URL format") key = parsed.path.lstrip("/") else: raise ValueError("Unsupported URI scheme") return bucket, key def upload_json_to_s3(json_obj, dest_uri: str, location="decent_homes/raw_data") -> str: """ Upload a JSON-serializable object to S3 at the given s3:// or https S3 URL. Returns the public-style HTTPS S3 URL (still private if bucket is private). """ bucket, pdf_key = parse_s3_uri(dest_uri) base_folder = os.path.dirname(pdf_key) # e.g. ".../report" # Build jsonified folder + timestamp filename timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") json_key = f"{base_folder}/{location}/jsonified/{timestamp}.json" # Same region/creds you used for download aws_access_key = "AKIAU5A36PPNJMZZ3KRW" aws_secret_key = "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" aws_region = "eu-west-2" s3 = boto3.client( "s3", aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=aws_region ) body = json.dumps(json_obj, ensure_ascii=False, indent=2, default=_json_default).encode("utf-8") s3.put_object( Bucket=bucket, Key=json_key, Body=body, ContentType="application/json" # Optional hardening: # , ServerSideEncryption="AES256" ) # Return an HTTPS-style S3 URL (matches your input style) return f"https://{bucket}.s3.{aws_region}.amazonaws.com/{json_key}" def generate_file_uri(UPRN): timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/" return file_uri def create_or_update_uploaded_file_entry( db_session, uprn: str, doc_type: ReportType, json_uri: str, s3_file_uri: str ): """ Create or update an entry in uploaded_files. - If a record with the same (uprn, doc_type) exists, update it. - Otherwise, insert a new record. Commits, refreshes, and returns the ORM object. """ existing = ( db_session.query(uploaded_files) .filter(uploaded_files.uprn == str(uprn), uploaded_files.doc_type == doc_type) .one_or_none() ) if existing: # Update existing record existing.s3_json_uri = json_uri existing.s3_json_upload_timestamp = datetime.now(timezone.utc) existing.s3_file_uri = s3_file_uri obj = existing else: # Insert new record obj = uploaded_files( doc_type=doc_type, s3_json_uri=json_uri, s3_json_upload_timestamp=datetime.now(timezone.utc), s3_file_uri=s3_file_uri, uprn=str(uprn), ) db_session.add(obj) db_session.commit() db_session.refresh(obj) return obj def handler(event, context): uprn_mapping = uprn_to_address() flats_to_stories = stories_to_address() # read data for houses only assets = process_complex("Houses Asset Data") simple = process_simple("Houses") houses = combine_records_by_address(assets, simple, dest_key="EPC_DATA") for house in houses: pseudo_name = house["ADDRESS"].split(",")[0] if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()): house.update({"UPRN": uprn_mapping[pseudo_name.upper()]}) #upload to s3 for i,house in enumerate(houses): uprn = house["UPRN"] print(uprn) json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]), location="decent_homes/raw_data") # Save JSON locally filename = f"{uprn}.json" filepath = os.path.join("output", filename) # saves inside an "output" folder os.makedirs("output", exist_ok=True) # make sure folder exists with open(filepath, "w", encoding="utf-8") as f: json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default) property_decent_home, decent_home_meta = decent_homes_calc(filepath) json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home") with get_db_session() as session: create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_SUMMARY, json_uri=json_uri_1, s3_file_uri=json_uri, ) json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta") with get_db_session() as session: create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_PROPERTY_META, json_uri=json_uri_1, s3_file_uri=json_uri, ) # read data for flats assets = process_complex("Chingford Rd 236-256 Properties") simple = process_complex("CHINGFORD ROAD 236-254 Asset Bl", "BLOCK_CODE") flats = combine_records_for_flats(assets, simple) for house in flats: pseudo_name = house["ADDRESS"].split(",")[0] if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()): print(uprn_mapping[pseudo_name.upper()]) house.update({"UPRN": uprn_mapping[pseudo_name.upper()]}) house["property_info"].update({"FLAT LEVEL": flats_to_stories[pseudo_name.upper()]}) for i,house in enumerate(flats): uprn = house["UPRN"] print(uprn) json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"])) # Save JSON locally filename = f"{house['UPRN']}.json" filepath = os.path.join("output", filename) # saves inside an "output" folder os.makedirs("output", exist_ok=True) # make sure folder exists with open(filepath, "w", encoding="utf-8") as f: json.dump(house, f, indent=2, ensure_ascii=False, default=_json_default) property_decent_home, decent_home_meta = decent_homes_calc(filepath) json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home") with get_db_session() as session: create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_SUMMARY, json_uri=json_uri_1, s3_file_uri=json_uri, ) json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta") with get_db_session() as session: create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_PROPERTY_META, json_uri=json_uri_1, s3_file_uri=json_uri, ) # Keep track of saved file path # To Do: # [Jun-te] Spec of quesation that we have for waltham forest # [Khalim] A document that has our mapping and our understanding of our data