Model/model_data/cleaner_app.py
2023-09-22 10:20:37 +01:00

94 lines
4 KiB
Python

from tqdm import tqdm
import os
import pandas as pd
import msgpack
from model_data.EpcClean import EpcClean
from model_data.simulation_system.core.Settings import EARLIEST_EPC_DATE
from pathlib import Path
from utils.s3 import save_data_to_s3
LAND_REGISTRY_PATHS = [
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv",
]
EPC_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates"
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev")
def app():
"""
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
sanitise any description data
Currently, this application is just run on a local machine
"""
cleaned_data = {}
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
for directory in tqdm(epc_directories):
directory_destructured = str(directory).split("/")[-1].split("-")
gss_code = directory_destructured[1]
local_authority = directory_destructured[2]
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]
# Convert to list of dictioaries as returned by the api
data = data.to_dict("records")
# Incorporate input data into cleaning
cleaner = EpcClean(data)
cleaner.clean()
# Extended cleaned_data
for k, data in cleaner.cleaned.items():
if k not in cleaned_data:
cleaned_data[k] = data
else:
existing_descriptions = [x["original_description"] for x in cleaned_data[k]]
new_data = [x for x in data if x["original_description"] not in existing_descriptions]
cleaned_data[k].extend(new_data)
# TODO: Add property age band into this
# uvalue_estimates = UvalueEstimations(data=data)
# uvalue_estimates.get_estimates(cleaner=cleaner)
# # TODO: Store these to a s3
# uvalue_estimates.walls
# uvalue_estimates.floors
# uvalue_estimates.roofs
# Basic check to make sure all descriptions are unique
for _, cleaned in cleaned_data.items():
descriptions = [x["original_description"] for x in cleaned]
if len(descriptions) != len(set(descriptions)):
raise ValueError("Duplicated descriptions found, check me")
# We store a singular file however we could store the data under the following file path:
# cleaned_epc_data/{component}/{original_description}/cleaned.bson
# where component is one of the keys of cleaned_data. If we store it against the original data, this
# data being read in will be extremely small, meaning quicker load times. We'll begin by storing as a single
# file and monitor usage patterns to see if it makes sense to split the data up
save_data_to_s3(
data=msgpack.packb(cleaned_data, use_bin_type=True),
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name=f"retrofit-data-{ENVIRONMENT}"
)
if __name__ == "__main__":
print("Initialising cleaner app run")
app()