refactoring get_data methodology

This commit is contained in:
Khalim Conn-Kowlessar 2025-02-20 08:21:59 +00:00
parent 63dbda005d
commit 47ad0e8275
2 changed files with 53 additions and 119 deletions

View file

@ -295,6 +295,7 @@ class AssetList:
self.variable_mappings = {}
self.rename_map = {}
self.keep_variables = []
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
@ -454,8 +455,8 @@ class AssetList:
self.landlord_existing_pv
]
# Keep just non-null variables (e.g landlord may not provide uprn
variables = [v for v in variables if v is not None]
rename = {
self.keep_variables = [v for v in variables if v is not None]
self.rename_map = {
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
self.address1_colname: self.STANDARD_ADDRESS_1,
self.postcode_colname: self.STANDARD_POSTCODE,
@ -467,21 +468,17 @@ class AssetList:
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
self.landlord_existing_pv: self.STANDARD_EXISTING_PV
}
rename = {k: v for k, v in rename.items() if k is not None}
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
if self.non_intrusives_present:
variables += self.NON_INTRUSIVES_COLNAMES
rename = {
**rename,
self.keep_variables += self.NON_INTRUSIVES_COLNAMES
self.rename_map = {
**self.rename_map,
**dict(
zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES])
)
}
self.standardised_asset_list = self.standardised_asset_list[variables].rename(
columns=rename
)
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
@ -498,10 +495,9 @@ class AssetList:
)
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime
# We check if we have a datetime - year built has not been renamed
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
@ -581,7 +577,8 @@ class AssetList:
]
# Apply renames to our standard names
self.standardised_asset_list = self.standardised_asset_list.rename(
# Perform final variable selection and renaming:
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
columns=self.rename_map
)

View file

@ -1,10 +1,10 @@
import os
import time
from BaseUtility import Definitions
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from datetime import datetime
from BaseUtility import Definitions
from asset_list.AssetList import AssetList
from asset_list.mappings.property_type import PROPERTY_MAPPING
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
@ -31,8 +31,8 @@ EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(
asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, uprn_column=None,
epc_api_only=False
asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map,
uprn_column=None, epc_api_only=False, row_id_name="row_id"
):
epc_data = []
errors = []
@ -103,12 +103,12 @@ def get_data(
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home["row_id"])
no_epc.append(home[row_id_name])
continue
if epc_api_only:
epc = {
"row_id": home["row_id"],
row_id_name: home[row_id_name],
**searcher.newest_epc.copy()
}
@ -144,7 +144,7 @@ def get_data(
time.sleep(np.random.uniform(0.1, 1))
epc = {
"row_id": home["row_id"],
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
@ -152,7 +152,7 @@ def get_data(
epc_data.append(epc)
except Exception as e:
errors.append(home["row_id"])
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc
@ -402,113 +402,48 @@ def app():
# # If we have the non-intrusives data, this should be true
# HAS_NON_INTRUSIVES = True
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
if MISSING_POSTCODES_METHOD is not None:
if MISSING_POSTCODES_METHOD == "last_two_words":
# Replace any double spaces
asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False)
asset_list["Postcode"] = np.where(
pd.isnull(asset_list["Postcode"]),
asset_list[FULLADDRESS_COLUMN].str.split(" ").str[-2:].str.join(" "),
asset_list["Postcode"]
)
else:
raise ValueError(f"Method {MISSING_POSTCODES_METHOD} not recognized")
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
asset_list["row_id"] = asset_list.index
# We clean up portential non-breaking spaces, and double spaces
for col in [c for c in [POSTCODE_COLUMN, FULLADDRESS_COLUMN, ADDRESS1_COLUMN] if c is not None]:
asset_list[col] = asset_list[col].astype(str)
asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False)
asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False)
asset_list[col] = asset_list[col].str.strip()
if ADDRESS1_COLUMN is None:
ADDRESS1_COLUMN = "address1_extracted"
asset_list = extract_address1(
asset_list=asset_list,
full_address_col=FULLADDRESS_COLUMN,
postcode_col=POSTCODE_COLUMN,
method=ADDRESS1_METHOD
)
if FULLADDRESS_COLUMN is None:
FULLADDRESS_COLUMN = "fulladdress_extracted"
# We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas
# Sometimes, some of the columns are empty, so we need to remove them
asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(
lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1
)
# We clean up portential non-breaking spaces, and double spaces
asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].astype(str)
asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace('\xa0', ' ', regex=False)
asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False)
if UPRN_COLUMN is not None:
# Check if it's numeric and if so, make sure it's an integer
def convert_uprn(x):
if pd.isnull(x):
return x
# check if numeric
if np.isreal(x):
return str(int(x))
if str(x).isdigit():
return str(int(x))
return x
asset_list[UPRN_COLUMN] = asset_list[UPRN_COLUMN].apply(convert_uprn)
# We attempt to process the year built column
if PROPERTY_YEAR_BUILT is not None:
# We check if we have a datetime
if isinstance(asset_list[PROPERTY_YEAR_BUILT].iloc[0], datetime):
# We treat any string columns - with common values we see
datetime_remap = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
asset_list[PROPERTY_YEAR_BUILT] = asset_list[PROPERTY_YEAR_BUILT].replace(datetime_remap)
asset_list[PROPERTY_YEAR_BUILT] = pd.to_datetime(asset_list[PROPERTY_YEAR_BUILT])
# Convert this to year
asset_list[PROPERTY_YEAR_BUILT] = asset_list[PROPERTY_YEAR_BUILT].dt.year
# We check for duplicated addresses
asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN]
### We retrieve the EPC data
# We chunk up this data into 5000 rows at a time
# Create the chunks directory
if not os.path.exists(os.path.join(DATA_FOLDER, "Chunks")):
os.makedirs(os.path.join(DATA_FOLDER, "Chunks"))
chunk_size = 5000
errors = []
no_epc = []
force_retrieve_data = False
skip = None # Used to skip already completed chunks
for i in range(0, len(asset_list), chunk_size):
chunk_size = 5000
filename = "Chunk {i}.csv"
download_folder = os.path.join(DATA_FOLDER, "Chunks")
if not os.path.exists(download_folder):
os.makedirs(download_folder)
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
# We check if we have files associated to these files already and if we do, and we do not want to force the
# fetching of the data, we skip
folder_contents = os.listdir(download_folder)
if all(x in folder_contents for x in downloaded_files):
skip = max(chunk_indexes)
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None:
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list[i:i + chunk_size]
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
asset_list=chunk,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
fulladdress_column=asset_list.STANDARD_FULL_ADDRESS,
address1_column=asset_list.STANDARD_ADDRESS_1,
postcode_column=asset_list.STANDARD_POSTCODE,
manual_uprn_map=MANUAL_UPRN_MAP,
uprn_column=UPRN_COLUMN
uprn_column=asset_list.STANDARD_UPRN
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk["row_id"].isin(errors)]
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
epc_data_failed, _, _ = get_data(
asset_list=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
@ -517,20 +452,22 @@ def app():
)
epc_data_chunk.extend(epc_data_failed)
errors.extend(errors_chunk)
no_epc.extend(no_epc_chunk)
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
json.dump(no_epc_chunk, f)
# We read in and concatenate the created created chunks
chunks_folder = os.path.join(DATA_FOLDER, "Chunks")
# List the contents
chunk_files = os.listdir(chunks_folder)
epc_data = []
for file in chunk_files:
csv_data = pd.read_csv(os.path.join(chunks_folder, file))
for file in downloaded_files:
csv_data = pd.read_csv(os.path.join(download_folder, file))
# We need to convert the recommendations back to a list
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)