Model/etl/epc_clean/app.py
2025-11-30 18:16:09 +00:00

104 lines
4.3 KiB
Python

from tqdm import tqdm
import os
import pandas as pd
import msgpack
import inspect
from datetime import datetime
from etl.epc_clean.EpcClean import EpcClean
from etl.epc.settings import EARLIEST_EPC_DATE
from pathlib import Path
from utils.s3 import save_data_to_s3, read_from_s3
src_file_path = inspect.getfile(lambda: None)
LAND_REGISTRY_PATHS = [
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2022 (1).csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2021.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2020.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2019.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2018.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part1.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part2.csv",
]
EPC_DIRECTORY = Path("/Users/khalimconn-kowlessar/Downloads") / "all-domestic-certificates"
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev")
def app():
"""
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new epc, we can quickly
sanitise any description data
Currently, this application is just run on a local machine
"""
cleaned_data = {}
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
errors = []
for directory in tqdm(epc_directories):
try:
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold
data = data[data["lodgement-date"] >= "2011-01-01"]
# Convert to list of dictioaries as returned by the api
data = data.to_dict("records")
# Incorporate input data into cleaning
cleaner = EpcClean(data)
cleaner.clean()
# Extended cleaned_data
for k, data in cleaner.cleaned.items():
if k not in cleaned_data:
cleaned_data[k] = data
else:
existing_descriptions = [x["original_description"] for x in cleaned_data[k]]
new_data = [x for x in data if x["original_description"] not in existing_descriptions]
cleaned_data[k].extend(new_data)
except Exception as e:
errors.append(directory)
if errors:
raise ValueError("We have errors")
# Basic check to make sure all descriptions are unique
for _, cleaned in cleaned_data.items():
descriptions = [x["original_description"] for x in cleaned]
if len(descriptions) != len(set(descriptions)):
raise ValueError("Duplicated descriptions found, check me")
# We store a singular file however we could store the data under the following file path:
# cleaned_epc_data/{component}/{original_description}/cleaned.bson
# where component is one of the keys of cleaned_data. If we store it against the original data, this
# data being read in will be extremely small, meaning quicker load times. We'll begin by storing as a single
# file and monitor usage patterns to see if it makes sense to split the data up
cleaned_historic = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name=f"retrofit-data-{ENVIRONMENT}"
)
cleaned_historic = msgpack.unpackb(cleaned_historic, raw=False)
save_data_to_s3(
data=msgpack.packb(cleaned_historic, use_bin_type=True),
s3_file_name=f"cleaned_epc_data/archive/{str(datetime.now())} - cleaned.bson",
bucket_name=f"retrofit-data-{ENVIRONMENT}"
)
save_data_to_s3(
data=msgpack.packb(cleaned_data, use_bin_type=True),
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name=f"retrofit-data-{ENVIRONMENT}"
)
if __name__ == "__main__":
print("Initialising cleaner app run")
app()