diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index 8379cc2a..14dce093 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -295,6 +295,7 @@ class AssetList: self.variable_mappings = {} self.rename_map = {} + self.keep_variables = [] def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"): @@ -454,8 +455,8 @@ class AssetList: self.landlord_existing_pv ] # Keep just non-null variables (e.g landlord may not provide uprn - variables = [v for v in variables if v is not None] - rename = { + self.keep_variables = [v for v in variables if v is not None] + self.rename_map = { self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID, self.address1_colname: self.STANDARD_ADDRESS_1, self.postcode_colname: self.STANDARD_POSTCODE, @@ -467,21 +468,17 @@ class AssetList: self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM, self.landlord_existing_pv: self.STANDARD_EXISTING_PV } - rename = {k: v for k, v in rename.items() if k is not None} + self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None} if self.non_intrusives_present: - variables += self.NON_INTRUSIVES_COLNAMES - rename = { - **rename, + self.keep_variables += self.NON_INTRUSIVES_COLNAMES + self.rename_map = { + **self.rename_map, **dict( zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES]) ) } - self.standardised_asset_list = self.standardised_asset_list[variables].rename( - columns=rename - ) - # We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y) self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[ self.full_address_colname @@ -498,10 +495,9 @@ class AssetList: ) # Clear our build year column - # We attempt to process the year built column if self.landlord_year_built is not None: - # We check if we have a datetime + # We check if we have a datetime - year built has not been renamed if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime): # We treat any string columns - with common values we see self.standardised_asset_list[self.landlord_year_built] = ( @@ -581,7 +577,8 @@ class AssetList: ] # Apply renames to our standard names - self.standardised_asset_list = self.standardised_asset_list.rename( + # Perform final variable selection and renaming: + self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename( columns=self.rename_map ) diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py index d520895d..83e5e0ca 100644 --- a/etl/route_march_data_pull/app.py +++ b/etl/route_march_data_pull/app.py @@ -1,10 +1,10 @@ import os import time -from BaseUtility import Definitions +import json import pandas as pd import numpy as np from tqdm import tqdm -from datetime import datetime +from BaseUtility import Definitions from asset_list.AssetList import AssetList from asset_list.mappings.property_type import PROPERTY_MAPPING from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS @@ -31,8 +31,8 @@ EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") def get_data( - asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, uprn_column=None, - epc_api_only=False + asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, + uprn_column=None, epc_api_only=False, row_id_name="row_id" ): epc_data = [] errors = [] @@ -103,12 +103,12 @@ def get_data( searcher.find_property(skip_os=True) if searcher.newest_epc is None: - no_epc.append(home["row_id"]) + no_epc.append(home[row_id_name]) continue if epc_api_only: epc = { - "row_id": home["row_id"], + row_id_name: home[row_id_name], **searcher.newest_epc.copy() } @@ -144,7 +144,7 @@ def get_data( time.sleep(np.random.uniform(0.1, 1)) epc = { - "row_id": home["row_id"], + row_id_name: home[row_id_name], **searcher.newest_epc.copy(), "recommendations": property_recommendations["rows"], "find_my_epc_data": find_epc_data, @@ -152,7 +152,7 @@ def get_data( epc_data.append(epc) except Exception as e: - errors.append(home["row_id"]) + errors.append(home[row_id_name]) time.sleep(5) return epc_data, errors, no_epc @@ -402,113 +402,48 @@ def app(): # # If we have the non-intrusives data, this should be true # HAS_NON_INTRUSIVES = True - asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME) - - if MISSING_POSTCODES_METHOD is not None: - if MISSING_POSTCODES_METHOD == "last_two_words": - # Replace any double spaces - asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False) - asset_list["Postcode"] = np.where( - pd.isnull(asset_list["Postcode"]), - asset_list[FULLADDRESS_COLUMN].str.split(" ").str[-2:].str.join(" "), - asset_list["Postcode"] - ) - else: - raise ValueError(f"Method {MISSING_POSTCODES_METHOD} not recognized") - - asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index() - asset_list["row_id"] = asset_list.index - - # We clean up portential non-breaking spaces, and double spaces - for col in [c for c in [POSTCODE_COLUMN, FULLADDRESS_COLUMN, ADDRESS1_COLUMN] if c is not None]: - asset_list[col] = asset_list[col].astype(str) - asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False) - asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False) - asset_list[col] = asset_list[col].str.strip() - - if ADDRESS1_COLUMN is None: - ADDRESS1_COLUMN = "address1_extracted" - asset_list = extract_address1( - asset_list=asset_list, - full_address_col=FULLADDRESS_COLUMN, - postcode_col=POSTCODE_COLUMN, - method=ADDRESS1_METHOD - ) - - if FULLADDRESS_COLUMN is None: - FULLADDRESS_COLUMN = "fulladdress_extracted" - # We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas - # Sometimes, some of the columns are empty, so we need to remove them - asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply( - lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1 - ) - - # We clean up portential non-breaking spaces, and double spaces - asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].astype(str) - asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace('\xa0', ' ', regex=False) - asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False) - - if UPRN_COLUMN is not None: - # Check if it's numeric and if so, make sure it's an integer - def convert_uprn(x): - - if pd.isnull(x): - return x - - # check if numeric - if np.isreal(x): - return str(int(x)) - - if str(x).isdigit(): - return str(int(x)) - return x - - asset_list[UPRN_COLUMN] = asset_list[UPRN_COLUMN].apply(convert_uprn) - - # We attempt to process the year built column - if PROPERTY_YEAR_BUILT is not None: - # We check if we have a datetime - if isinstance(asset_list[PROPERTY_YEAR_BUILT].iloc[0], datetime): - # We treat any string columns - with common values we see - datetime_remap = { - "Pre 1900": datetime(year=1899, month=12, day=31), - } - asset_list[PROPERTY_YEAR_BUILT] = asset_list[PROPERTY_YEAR_BUILT].replace(datetime_remap) - - asset_list[PROPERTY_YEAR_BUILT] = pd.to_datetime(asset_list[PROPERTY_YEAR_BUILT]) - # Convert this to year - asset_list[PROPERTY_YEAR_BUILT] = asset_list[PROPERTY_YEAR_BUILT].dt.year - - # We check for duplicated addresses - asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN] + ### We retrieve the EPC data # We chunk up this data into 5000 rows at a time # Create the chunks directory - if not os.path.exists(os.path.join(DATA_FOLDER, "Chunks")): - os.makedirs(os.path.join(DATA_FOLDER, "Chunks")) - chunk_size = 5000 - errors = [] - no_epc = [] + force_retrieve_data = False skip = None # Used to skip already completed chunks - for i in range(0, len(asset_list), chunk_size): + chunk_size = 5000 + filename = "Chunk {i}.csv" + download_folder = os.path.join(DATA_FOLDER, "Chunks") + if not os.path.exists(download_folder): + os.makedirs(download_folder) + + chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size)) + downloaded_files = {filename.format(i=i) for i in chunk_indexes} + + # We check if we have files associated to these files already and if we do, and we do not want to force the + # fetching of the data, we skip + folder_contents = os.listdir(download_folder) + if all(x in folder_contents for x in downloaded_files): + skip = max(chunk_indexes) + + for i in range(0, len(asset_list.standardised_asset_list), chunk_size): print(f"Processing chunk {i} to {i + chunk_size}") - if skip is not None: + if skip is not None and not force_retrieve_data: if i <= skip: continue - chunk = asset_list[i:i + chunk_size] + chunk = asset_list.standardised_asset_list[i:i + chunk_size] epc_data_chunk, errors_chunk, no_epc_chunk = get_data( asset_list=chunk, - fulladdress_column=FULLADDRESS_COLUMN, - address1_column=ADDRESS1_COLUMN, - postcode_column=POSTCODE_COLUMN, + row_id_name=asset_list.DOMNA_PROPERTY_ID, + fulladdress_column=asset_list.STANDARD_FULL_ADDRESS, + address1_column=asset_list.STANDARD_ADDRESS_1, + postcode_column=asset_list.STANDARD_POSTCODE, manual_uprn_map=MANUAL_UPRN_MAP, - uprn_column=UPRN_COLUMN + uprn_column=asset_list.STANDARD_UPRN ) # We now retrieve any failed properties - chunk_failed = chunk[chunk["row_id"].isin(errors)] + chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)] epc_data_failed, _, _ = get_data( asset_list=chunk_failed, + row_id_name=asset_list.DOMNA_PROPERTY_ID, fulladdress_column=FULLADDRESS_COLUMN, address1_column=ADDRESS1_COLUMN, postcode_column=POSTCODE_COLUMN, @@ -517,20 +452,22 @@ def app(): ) epc_data_chunk.extend(epc_data_failed) - errors.extend(errors_chunk) - no_epc.extend(no_epc_chunk) # Append the failed data to the main data # Store the chunk locally as a csv pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False) + # Store the errors and no-data locally + with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f: + json.dump(errors_chunk, f) + + with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f: + json.dump(no_epc_chunk, f) # We read in and concatenate the created created chunks - chunks_folder = os.path.join(DATA_FOLDER, "Chunks") # List the contents - chunk_files = os.listdir(chunks_folder) epc_data = [] - for file in chunk_files: - csv_data = pd.read_csv(os.path.join(chunks_folder, file)) + for file in downloaded_files: + csv_data = pd.read_csv(os.path.join(download_folder, file)) # We need to convert the recommendations back to a list csv_data["recommendations"] = csv_data["recommendations"].apply(eval) csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)