Merge pull request #87 from Hestia-Homes/feautre/walthamforest_etl

publish to production
This commit is contained in:
Jun-te Kim 2025-09-22 10:28:27 +01:00 committed by GitHub
commit 243ff83855
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 137 additions and 5 deletions

View file

@ -5,6 +5,9 @@ import os
import copy
from collections import defaultdict
from typing import List, Dict, Any, Union, Optional
import boto3
import datetime
from urllib.parse import urlparse
def process_complex(sheet_name, group_key="ADDRESS"):
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name=sheet_name)
@ -108,21 +111,150 @@ def combine_records_for_flats(assets: dict, simple: list) -> dict:
return assets
def get_energy_information():
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="")
# add uprn to everything
def _json_default(o):
from datetime import date, datetime, time
from decimal import Decimal
# datetimes → ISO 8601 strings
if isinstance(o, (datetime, date, time)):
return o.isoformat()
# decimals → float (or str if you need exactness)
if isinstance(o, Decimal):
return float(o)
# sets → lists
if isinstance(o, set):
return list(o)
# numpy/pandas types (optional)
try:
import numpy as np
import pandas as pd
if isinstance(o, (np.integer,)):
return int(o)
if isinstance(o, (np.floating,)):
return float(o)
if isinstance(o, (np.ndarray,)):
return o.tolist()
if isinstance(o, (pd.Timestamp,)):
return o.isoformat()
except Exception:
pass
# last resort: string
return str(o)
def uprn_to_address():
df = pd.read_excel("../../../../../home/Downloads/data.xlsx", sheet_name="All Energy Breakdown ")
mapping = df.set_index('Address')['UPRN'].to_dict()
return mapping
def parse_s3_uri(uri: str):
"""
Parse an S3 URI or HTTPS S3 URL into bucket and key.
Supports formats:
- s3://bucket-name/path/to/file
- https://bucket-name.s3.region.amazonaws.com/path/to/file
"""
parsed = urlparse(uri)
if parsed.scheme == "s3":
# s3://bucket/key
bucket = parsed.netloc
key = parsed.path.lstrip("/")
elif parsed.scheme in ("http", "https"):
# https://bucket-name.s3.region.amazonaws.com/key
host_parts = parsed.netloc.split(".")
if len(host_parts) >= 3 and host_parts[1] == "s3":
bucket = host_parts[0]
else:
raise ValueError("Not a valid S3 HTTPS URL format")
key = parsed.path.lstrip("/")
else:
raise ValueError("Unsupported URI scheme")
return bucket, key
def upload_json_to_s3(json_obj, dest_uri: str) -> str:
"""
Upload a JSON-serializable object to S3 at the given s3:// or https S3 URL.
Returns the public-style HTTPS S3 URL (still private if bucket is private).
"""
bucket, pdf_key = parse_s3_uri(dest_uri)
base_folder = os.path.dirname(pdf_key) # e.g. ".../report"
# Build jsonified folder + timestamp filename
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
json_key = f"{base_folder}/walthamforest_raw_data/jsonified/{timestamp}.json"
# Same region/creds you used for download
aws_access_key = "AKIAU5A36PPNJMZZ3KRW"
aws_secret_key = "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"
aws_region = "eu-west-2"
s3 = boto3.client(
"s3",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)
body = json.dumps(json_obj, ensure_ascii=False, indent=2, default=_json_default).encode("utf-8")
s3.put_object(
Bucket=bucket,
Key=json_key,
Body=body,
ContentType="application/json"
# Optional hardening:
# , ServerSideEncryption="AES256"
)
# Return an HTTPS-style S3 URL (matches your input style)
return f"https://{bucket}.s3.{aws_region}.amazonaws.com/{json_key}"
def generate_file_uri(UPRN):
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/"
return file_uri
def handler(event, context):
uprn_mapping = uprn_to_address()
# read data for houses only
assets = process_complex("Houses Asset Data")
simple = process_simple("Houses")
houses = combine_records_by_address(assets, simple, dest_key="EPC_DATA")
for house in houses:
pseudo_name = house["ADDRESS"].split(",")[0]
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
#upload to s3
for house in houses:
print(house["UPRN"])
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]))
# read data for flats
assets = process_complex("Chingford Rd 236-256 Properties")
simple = process_complex("CHINGFORD ROAD 236-254 Asset Bl", "BLOCK_CODE")
flats = combine_records_for_flats(assets, simple)
for house in flats:
pseudo_name = house["ADDRESS"].split(",")[0]
if pseudo_name.lower() in (k.lower() for k in uprn_mapping.keys()):
print(uprn_mapping[pseudo_name.upper()])
house.update({"UPRN": uprn_mapping[pseudo_name.upper()]})
for house in flats:
print(house["UPRN"])
json_uri = upload_json_to_s3(house, generate_file_uri(house["UPRN"]))
# with open("flat.json", "w") as f:
# json.dump(houses[0], f, indent=2, ensure_ascii=False, default=_json_default)

View file

@ -199,7 +199,7 @@ filtered_dfs.append(_)
# POST EPC
post_epc = df[
df["post epc"].str.lower().isin(["completed & uploaded"])
df["post epc/epr"].str.lower().isin(["completed & uploaded"])
].copy()
post_epc["job_type"] = "POST EPC"
post_epc["evidence_record"] = None
@ -209,7 +209,7 @@ filtered_dfs.append(post_epc)
# POST EPR
post_epr = df[
df["post epc"].str.lower().isin(["post epr completed"])
df["post epc/epr"].str.lower().isin(["post epr completed"])
].copy()
post_epr["job_type"] = "POST EPR"
post_epr["evidence_record"] = None