matching ecosurv data to asset list

This commit is contained in:
Khalim Conn-Kowlessar 2025-04-15 17:26:40 +01:00
parent 9e179e7f9b
commit e99f1506f9
4 changed files with 203 additions and 55 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="AssetList" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

View file

@ -382,6 +382,8 @@ class AssetList:
self.outcomes_for_output = pd.DataFrame()
self.master_surveyed = None
self.unmatched_submissions = pd.DataFrame()
self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame()
# When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to
@ -1114,7 +1116,7 @@ class AssetList:
def identify_worktypes(self, cleaned):
if self.STANDARD_SAP is not None:
if self.landlord_sap is not None:
# We add a SAP category for all work type identification
self.standardised_asset_list["SAP Category"] = np.where(
(
@ -1135,16 +1137,22 @@ class AssetList:
)
else:
# We add a SAP category for all work type identification
# We break into 4 categories (54 or less, 55-68, 69-74, 75 or more)
self.standardised_asset_list["SAP Category"] = np.where(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68,
"SAP Rating 68 or less",
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54),
"SAP Rating 54 or less",
np.where(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68),
"SAP Rating 55-68",
np.where(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
)
)
@ -1406,7 +1414,12 @@ class AssetList:
elif self.old_format_non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
["retro drilled", "retro filled", "ewi", "retro drilled/ solid"]
[
"retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled",
]
) |
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().str.contains(
"retro drilled"
)
)
else:
@ -1565,13 +1578,6 @@ class AssetList:
solar_roof_meets_criteria
)
# We shouldn't have an overlap
if (
self.standardised_asset_list["solar_eligible"] &
self.standardised_asset_list["solar_eligible_needs_heating_upgrade"]
).sum():
raise ValueError("Both heating upgrade and no heating upgrade are true - this should not be possible")
# We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E
# or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables
self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = (
@ -1617,27 +1623,58 @@ class AssetList:
)
# We break the cavity reason into a few different categories, when the EPC is different from inspections
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
if self.old_format_non_intrusives_present:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
[
"retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled",
]
)) &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show filled at build: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
self.standardised_asset_list['non_intrusive_indicates_cavity_extraction'] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show filled or other: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
else:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show filled at build: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
@ -1695,34 +1732,46 @@ class AssetList:
)
# Flag anything that has existing outcomes
if (self.outcomes is not None) and ("Surveyed" in self.standardised_asset_list.columns):
if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns):
if "Installer Refusal" not in self.standardised_asset_list.columns:
if "installer refusal" not in self.standardised_asset_list.columns:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(self.standardised_asset_list["Surveyed"] > 0)
(self.standardised_asset_list["surveyed"] > 0)
),
None,
self.standardised_asset_list["cavity_reason"]
)
else:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(self.standardised_asset_list["Surveyed"] > 0) |
(self.standardised_asset_list["Installer Refusal"] > 0)
),
None,
self.standardised_asset_list["cavity_reason"]
)
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(self.standardised_asset_list["surveyed"] > 0) |
(self.standardised_asset_list["installer refusal"] > 0)
),
None,
self.standardised_asset_list[col]
)
if self.master_surveyed is not None:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(~pd.isnull(self.standardised_asset_list["submission_date"]))
),
None,
self.standardised_asset_list["cavity_reason"]
)
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["submission_date"]))
),
None,
self.standardised_asset_list[col]
)
if self.ecosurv is not None:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["ecosurv_reference"]))
),
None,
self.standardised_asset_list[col]
)
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
@ -2081,6 +2130,103 @@ class AssetList:
self.hubspot_data = programme_data
def flag_ecosurv(self, ecosurv_landlords=None):
"""
This class will match ecosurv data to the asset list
:return:
"""
if ecosurv_landlords is None:
return
# TODO: Fetch from Sharepoint
ecosurv_filepath = "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.csv"
logger.info(
"Getting Ecosurv data from %s", ecosurv_filepath
)
self.ecosurv = pd.read_csv(
ecosurv_filepath,
encoding="cp437"
)
landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False)
landlord_references = landlords[
landlords["Landlord"].str.lower().str.contains(ecosurv_landlords)
]
landlord_ecosurv_data = self.ecosurv[
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
]
# Try and match to asset list
matched = []
unmatched = []
for _, row in tqdm(landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]):
postcode = row["Postcode"].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE].str.replace(" ", "").str.lower() ==
postcode
)
].copy()
if df.empty:
unmatched.append(row["Reference"])
continue
if df.shape[0] > 1:
house_no = SearchEpc.get_house_number(row["Address Line 1"], row["Postcode"])
df["house_no"] = df.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE]
),
axis=1
)
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
# We compare address line 1 to full address
if any(
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False)
):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False
)
]
if df.shape[0] > 1:
df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"]
if df.shape[0] == 1:
matched.append(
{
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
"ecosurv_reference": row["Reference"],
"ecosurv_address1": row["Address Line 1"],
"ecosurv_postcode": row["Postcode"],
}
)
continue
if df.shape[0] > 1:
unmatched.append(row["Reference"])
continue
# We now match
matched = pd.DataFrame(matched)
self.standardised_asset_list = self.standardised_asset_list.merge(
matched,
how="left",
on=self.STANDARD_LANDLORD_PROPERTY_ID,
)
# We keep a record of submissions that were NOT matches
self.ecosurv_no_match = self.ecosurv[
self.ecosurv["Reference"].isin(unmatched)
].copy()
def flag_outcomes(
self,
outcomes_filepath,

View file

@ -124,6 +124,7 @@ def app():
]
master_to_asset_list_filepath = None
phase = False
ecosuv_landlords = "paul butler|bromford"
# Torus
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Torus/Phase 1"

View file

@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
class PropertyValuation: