survey-extraction/deployment/lambda/walthamforest_etl/docker/app.py
2025-09-16 19:43:38 +01:00

130 lines
4 KiB
Python

import pandas as pd
import json
from pprint import pprint
import os
import copy
from collections import defaultdict
from typing import List, Dict, Any, Union, Optional
def process_complex(sheet_name, group_key="ADDRESS"):
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
element_cols = [
"ELEMENT GROUP", "ELEMENT CODE", "ELEMENT CODE DESCRIPTION",
"ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION",
"ELEMENT DATE VALUE", "ELEMENT NUMERIC VALUE",
"ELEMENT TEXT VALUE", "QUANTITY",
"INSTALL DATE", "REMAINING LIFE", "ELEMENT COMMENTS"
]
property_cols = [
"PROP REF", "ADDRESS", "OWNERSHIP",
"PROP STATUS", "PROP TYPE", "PROP SUB TYPE"
]
# Prepare output
records = []
# Loop through unique values in group_key (ADDRESS or BLOCK_CODE)
for val in df[group_key].unique():
g = df[df[group_key] == val] # subset
property_info = g[property_cols].drop_duplicates().iloc[0].to_dict()
# build elements dict keyed by ELEMENT CODE DESCRIPTION
elements_dict = {}
for _, row in g[element_cols].drop_duplicates().iterrows():
key = row["ELEMENT CODE DESCRIPTION"] # could also use "ELEMENT CODE"
elements_dict[key] = row.to_dict()
records.append({
group_key: val,
"property_info": property_info,
"elements": elements_dict
})
return records
def process_simple(sheet_name):
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
records = []
for address in df["Address"].unique():
g = df[df["Address"] == address].drop_duplicates() # subset for that address
row = g.iloc[0] # take first row if multiple
# build dict of all columns except Address
elements_dict = row.drop(labels=["Address"]).to_dict()
records.append({
"ADDRESS": address,
"to_add": elements_dict
})
return records
def combine_records_by_address(
asset_records: List[Dict[str, Any]],
simple_records: List[Dict[str, Any]],
dest_key: str = "to_add",
unique_identifier="Address"
) -> List[Dict[str, Any]]:
"""
Merge process_house_asset_data() and process_simple() results by ADDRESS.
All columns from simple_records['to_add'] will be merged under dest_key.
"""
# Index inputs by ADDRESS
asset_by_addr = {r["ADDRESS"]: r for r in asset_records}
simple_by_addr = {r["ADDRESS"]: r for r in simple_records}
merged: List[Dict[str, Any]] = []
# Use union of addresses from both sources
all_addresses = set(asset_by_addr) | set(simple_by_addr)
for addr in sorted(all_addresses):
base = copy.deepcopy(asset_by_addr.get(addr, {"ADDRESS": addr}))
simple = simple_by_addr.get(addr)
if simple:
base[dest_key] = simple.get("to_add", {})
merged.append(base)
return merged
def combine_records_for_flats(assets: dict, simple: list) -> dict:
"""Attach BLOCK_INFO (from simple[0]) to each asset in assets."""
if not simple or not isinstance(simple[0], dict):
return assets # nothing to add
block_info = simple[0]
for record in assets:
# Make sure record is a dict
record.update({"BLOCK_INFO": block_info})
return assets
def get_energy_information():
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="")
# add uprn to everything
def handler(event, context):
# read data for houses only
assets = process_complex("Houses Asset Data")
simple = process_simple("Houses")
houses = combine_records_by_address(assets, simple, dest_key="EPC_DATA")
# read data for flats
assets = process_complex("Chingford Rd 236-256 Properties")
simple = process_complex("CHINGFORD ROAD 236-254 Asset Bl", "BLOCK_CODE")
flats = combine_records_for_flats(assets, simple)
# run a script that upload to s3 -> uprn -> jsonified -> walthamforest -> uri