merged from main

This commit is contained in:
Jun-te Kim 2026-02-03 17:01:37 +00:00
parent 96a6557e4b
commit a8150e3c91

View file

@ -12,23 +12,35 @@ from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=")
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
"a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=",
)
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
def extract_address1(
asset_list, full_address_col, postcode_col, method="first_two_words"
):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
asset_list["address1_extracted"] = (
asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
)
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
asset_list["address1_extracted"] = (
asset_list[full_address_col].str.split(" ").str[0]
)
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
lambda x: SearchEpc.get_house_number(
address=x[full_address_col], postcode=x[postcode_col]
),
axis=1,
)
return asset_list
@ -57,15 +69,11 @@ def app():
EPC recommendations
Property UPRN
"""
<<<<<<< HEAD
data_folder = ("/workspaces/model/asset_list")
data_filename = "assets.xlsx"
=======
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Hackney"
data_filename = "Domna SHF Wave 3 (3).xlsx"
sheet_name = "Domna Wave 3"
postcode_column = 'Postcode'
postcode_column = "Postcode"
address1_column = "Address 1"
address1_method = None
fulladdress_column = None
@ -96,15 +104,16 @@ def app():
landlord_block_reference = None
# Peabody data for cleaning
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation")
data_folder = (
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation"
)
data_filename = "to_standardise_uprns.xlsx"
>>>>>>> 3874da6177cbcc37f7a488bec0a06e387906653c
sheet_name = "Sheet1"
postcode_column = 'Postcode'
postcode_column = "Postcode"
address1_column = None
address1_method = 'house_number_extraction'
fulladdress_column = 'Address'
address1_method = "house_number_extraction"
fulladdress_column = "Address"
address_cols_to_concat = None
missing_postcodes_method = None
landlord_year_built = None
@ -155,49 +164,62 @@ def app():
landlord_existing_pv=landlord_existing_pv,
landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference,
phase=phase
phase=phase,
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_property_type] if
asset_list.landlord_property_type else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_property_type]
if asset_list.landlord_property_type
else {}
).items()
if k not in PROPERTY_MAPPING
}
new_built_form_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_built_form] if
asset_list.landlord_built_form else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_built_form]
if asset_list.landlord_built_form
else {}
).items()
if k not in BUILT_FORM_MAPPINGS
}
new_wall_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_wall_construction] if
asset_list.landlord_wall_construction else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_wall_construction]
if asset_list.landlord_wall_construction
else {}
).items()
if k not in WALL_CONSTRUCTION_MAPPINGS
}
new_heating_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_heating_system] if
asset_list.landlord_heating_system else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_heating_system]
if asset_list.landlord_heating_system
else {}
).items()
if k not in HEATING_MAPPINGS
}
new_existing_pv_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_existing_pv]
if asset_list.landlord_existing_pv
else {}
).items()
if k not in EXISTING_PV_MAPPINGS
}
new_roof_construction_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_roof_construction] if
asset_list.landlord_roof_construction else {}
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_roof_construction]
if asset_list.landlord_roof_construction
else {}
).items()
if k not in ROOF_CONSTRUCTION_MAPPINGS
}
@ -211,7 +233,7 @@ def app():
outcomes_address=outcomes_address,
outcomes_postcode=outcomes_postcode,
outcomes_houseno=outcomes_houseno,
outcomes_id=outcomes_id
outcomes_id=outcomes_id,
)
asset_list.flag_survey_master(
@ -245,14 +267,16 @@ def app():
skip = max(chunk_indexes)
if any(x in folder_contents for x in downloaded_files):
skip = max([i for i in chunk_indexes if filename.format(i=i) in folder_contents])
skip = max(
[i for i in chunk_indexes if filename.format(i=i) in folder_contents]
)
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
@ -264,7 +288,7 @@ def app():
built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only,
epc_auth_token=EPC_AUTH_TOKEN
epc_auth_token=EPC_AUTH_TOKEN,
)
# We now retrieve any failed properties
@ -287,7 +311,9 @@ def app():
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False)
pd.DataFrame(epc_data_chunk).to_csv(
os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False
)
# Store the errors and no-data locally
with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
@ -318,7 +344,9 @@ def app():
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
unique_recommendations.update(
[rec["improvement-summary-text"] for rec in row["recommendations"]]
)
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
@ -338,20 +366,24 @@ def app():
transformed_df = pd.DataFrame(transformed_data)
for col in [
"Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
"Floor insulation",
"Floor insulation (suspended floor)",
]:
if col not in transformed_df.columns:
transformed_df[col] = False
transformed_df = transformed_df[
[
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
asset_list.DOMNA_PROPERTY_ID,
"Floor insulation (solid floor)",
"Floor insulation",
"Floor insulation (suspended floor)",
]
]
transformed_df["epc_has_floor_recommendation"] = (
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
transformed_df["Floor insulation (suspended floor)"]
transformed_df["Floor insulation (solid floor)"]
| transformed_df["Floor insulation"]
| transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
@ -364,21 +396,20 @@ def app():
find_my_epc_data.append(
{
asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID],
**x["find_my_epc_data"]
**x["find_my_epc_data"],
}
)
else:
find_my_epc_data.append(
{
asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]
}
{asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]}
)
find_my_epc_data = pd.DataFrame(find_my_epc_data)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
how="left", on=asset_list.DOMNA_PROPERTY_ID
how="left",
on=asset_list.DOMNA_PROPERTY_ID,
)
# We check if we get the solar pv column:
@ -388,24 +419,26 @@ def app():
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(
columns=asset_list.EPC_API_DATA_NAMES
)
].rename(columns=asset_list.EPC_API_DATA_NAMES)
# Look for columns not in the find my EPC data, which will have happened if we didn't
# retrieve it in the first place
missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns]
missed_find_epc_cols = [
c
for c in list(asset_list.FIND_EPC_DATA_NAMES.keys())
if c not in find_my_epc_data.columns
]
if missed_find_epc_cols:
for c in missed_find_epc_cols:
find_my_epc_data[c] = None
epc_df = epc_df.merge(
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
]
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID
on=asset_list.DOMNA_PROPERTY_ID,
)
asset_list.merge_data(epc_df)
@ -422,7 +455,10 @@ def app():
asset_list.get_work_figures()
# Store as an excel
filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx"
filename = (
os.path.join(data_folder, ".".join(data_filename.split(".")[:-1]))
+ " - Standardised.xlsx"
)
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
# Determine inspections priority
@ -446,26 +482,42 @@ def app():
# )
with pd.ExcelWriter(filename) as writer:
asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)
asset_list.standardised_asset_list.to_excel(
writer, sheet_name="Standardised Asset List", index=False
)
if asset_list.block_analysis_df is not None:
asset_list.block_analysis_df.to_excel(writer, sheet_name="Block Analysis", index=False)
asset_list.block_analysis_df.to_excel(
writer, sheet_name="Block Analysis", index=False
)
# If we have outcomes, we add a tab with the outcomes
if not asset_list.outcomes_for_output.empty:
asset_list.outcomes_for_output.to_excel(writer, sheet_name="Outcomes", index=False)
asset_list.outcomes_for_output.to_excel(
writer, sheet_name="Outcomes", index=False
)
if not asset_list.unmatched_submissions.empty:
asset_list.unmatched_submissions.to_excel(writer, sheet_name="Unmatched Submissions", index=False)
asset_list.unmatched_submissions.to_excel(
writer, sheet_name="Unmatched Submissions", index=False
)
if not asset_list.outcomes_no_match.empty:
asset_list.outcomes_no_match.to_excel(writer, sheet_name="Unmatched Outcomes", index=False)
asset_list.outcomes_no_match.to_excel(
writer, sheet_name="Unmatched Outcomes", index=False
)
if not asset_list.ecosurv_no_match.empty:
asset_list.ecosurv_no_match.to_excel(writer, sheet_name="Unmatched Ecosurv", index=False)
asset_list.ecosurv_no_match.to_excel(
writer, sheet_name="Unmatched Ecosurv", index=False
)
if not asset_list.geographical_areas.empty:
asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False)
asset_list.geographical_areas.to_excel(
writer, sheet_name="Geographical Areas", index=False
)
# Store dupes
if asset_list.duplicated_addresses is not None:
if not asset_list.duplicated_addresses.empty:
asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False)
asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False
)