mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
standardising asset list for livewest
This commit is contained in:
parent
1d48ede60e
commit
1d0c8a3e43
11 changed files with 468 additions and 44 deletions
2
.idea/Model.iml
generated
2
.idea/Model.iml
generated
|
|
@ -7,7 +7,7 @@
|
|||
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
|
||||
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
|
||||
</content>
|
||||
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
|
||||
<orderEntry type="jdk" jdkName="AssetList" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PyNamespacePackagesService">
|
||||
|
|
|
|||
2
.idea/misc.xml
generated
2
.idea/misc.xml
generated
|
|
@ -3,7 +3,7 @@
|
|||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.10 (backend)" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="AssetList" project-jdk-type="Python SDK" />
|
||||
<component name="PyCharmProfessionalAdvertiser">
|
||||
<option name="shown" value="true" />
|
||||
</component>
|
||||
|
|
|
|||
|
|
@ -354,7 +354,10 @@ class AssetList:
|
|||
self.local_filepath = local_filepath
|
||||
self.sheet_name = sheet_name
|
||||
# Read in the data
|
||||
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
|
||||
if local_filepath.endswith(".xlsx"):
|
||||
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
|
||||
else:
|
||||
self.raw_asset_list = pd.read_csv(local_filepath)
|
||||
self.standardised_asset_list = self.raw_asset_list.copy()
|
||||
# Will be used to store aggregated figures against the various work types
|
||||
self.work_type_figures = {}
|
||||
|
|
@ -442,6 +445,9 @@ class AssetList:
|
|||
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
|
||||
axis=1
|
||||
)
|
||||
|
||||
for _, x in asset_list.iterrows():
|
||||
SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col])
|
||||
return asset_list
|
||||
|
||||
raise ValueError(f"Method {method} not recognized")
|
||||
|
|
@ -509,6 +515,18 @@ class AssetList:
|
|||
return str(int(x))
|
||||
return x
|
||||
|
||||
@staticmethod
|
||||
def _clean_postcode(postcode):
|
||||
# Remove double spaces
|
||||
postcode = postcode.replace(" ", " ")
|
||||
if " " not in postcode:
|
||||
# Restructure it
|
||||
return " ".join(
|
||||
[postcode[:-3], postcode[-3:]]
|
||||
)
|
||||
|
||||
return postcode
|
||||
|
||||
def init_standardise(self):
|
||||
"""
|
||||
This function is used to standardise the asset list
|
||||
|
|
@ -518,6 +536,10 @@ class AssetList:
|
|||
# Remove rows without a postcode
|
||||
if self.postcode_colname is not None:
|
||||
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
|
||||
# We also clean postcode columns where if there is not space, we create one
|
||||
self.standardised_asset_list[self.postcode_colname] = self.standardised_asset_list[
|
||||
self.postcode_colname
|
||||
].apply(self._clean_postcode)
|
||||
|
||||
# We clean up portential non-breaking spaces, and double spaces
|
||||
for col in [
|
||||
|
|
@ -667,7 +689,8 @@ class AssetList:
|
|||
"#MULTIVALUE",
|
||||
"This cell has an external reference that can't be shown or edited. Editing this cell will "
|
||||
"remove the external reference.",
|
||||
"ND"
|
||||
"ND",
|
||||
'PIMSS EMPTY'
|
||||
]
|
||||
|
||||
if pd.isnull(date_str) or date_str in known_errors:
|
||||
|
|
@ -693,7 +716,7 @@ class AssetList:
|
|||
if str(date_str).isdigit() & (len(str(date_str)) == 4):
|
||||
return int(date_str)
|
||||
|
||||
raise NotImplementedError("Unhandled format for year built - implement me")
|
||||
raise NotImplementedError(f"Unhandled format for year built, value is {date_str} - implement me")
|
||||
|
||||
self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[
|
||||
self.landlord_year_built
|
||||
|
|
@ -2376,12 +2399,12 @@ class AssetList:
|
|||
outcomes_filepath,
|
||||
outcomes_sheetname,
|
||||
outcomes_postcode,
|
||||
outcomes_houseno
|
||||
outcomes_houseno,
|
||||
outcomes_id
|
||||
):
|
||||
if outcomes_filepath is None:
|
||||
return
|
||||
|
||||
# ToDO: Parameterise for future use?
|
||||
self.outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname)
|
||||
self.outcomes["row_id"] = self.outcomes.index
|
||||
|
||||
|
|
@ -2390,6 +2413,26 @@ class AssetList:
|
|||
lookup = []
|
||||
nomatch = []
|
||||
for _, x in tqdm(self.outcomes.iterrows(), total=len(self.outcomes)):
|
||||
|
||||
# Check if we have an id
|
||||
oid = x[outcomes_id] if outcomes_id is not None else None
|
||||
|
||||
if oid is not None:
|
||||
matched = self.standardised_asset_list[
|
||||
(self.standardised_asset_list[
|
||||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||||
].str.strip() == oid)
|
||||
]
|
||||
|
||||
if matched.shape[0] == 1:
|
||||
lookup.append(
|
||||
{
|
||||
"row_id": x["row_id"],
|
||||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
address_clean = x["Address"].lower().replace(",", "").replace(" ", " ")
|
||||
|
||||
matched = self.standardised_asset_list[
|
||||
|
|
@ -2407,20 +2450,6 @@ class AssetList:
|
|||
)
|
||||
continue
|
||||
|
||||
if "UPRN" in x:
|
||||
matched = self.standardised_asset_list[
|
||||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == x["UPRN"]
|
||||
]
|
||||
|
||||
if matched.shape[0] == 1:
|
||||
lookup.append(
|
||||
{
|
||||
"row_id": x["row_id"],
|
||||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
matched = self.standardised_asset_list[
|
||||
(self.standardised_asset_list[self.STANDARD_POSTCODE] == x[outcomes_postcode])
|
||||
].copy()
|
||||
|
|
@ -2459,6 +2488,9 @@ class AssetList:
|
|||
self.outcomes_no_match = self.outcomes[self.outcomes["row_id"].isin(nomatch)]
|
||||
lookup = pd.DataFrame(lookup)
|
||||
|
||||
if lookup.empty:
|
||||
return
|
||||
|
||||
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
|
||||
# Where we have multiple rows, we want to make a call on what the action should be. For example,
|
||||
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
|
||||
|
|
@ -2529,9 +2561,13 @@ class AssetList:
|
|||
else "INSTALL / CANCELLATION DATE"
|
||||
)
|
||||
|
||||
submission_col = (
|
||||
"SUBMISSION DATE" if "SUBMISSION DATE" in master_data.columns else "SUBMISSION DATE TO INSTALLERS"
|
||||
)
|
||||
|
||||
# We just need to check if any were cancelled
|
||||
master_to_append = master_data[
|
||||
["UPRN", install_col, "SUBMISSION DATE"]
|
||||
["UPRN", install_col, submission_col]
|
||||
].rename(columns={"UPRN": self.STANDARD_LANDLORD_PROPERTY_ID, install_col: "survey_status"})
|
||||
master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
|
||||
|
||||
|
|
|
|||
|
|
@ -88,6 +88,67 @@ def app():
|
|||
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
|
||||
# - Or the insulation required is loft/cavity (floors should be solid)
|
||||
|
||||
# Live West (2018 Asset list)
|
||||
data_folder = (
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/Programme Update - March 2025/2018 Asset List"
|
||||
)
|
||||
data_filename = "LIVEWEST STOCK - 23rd October 2018.xlsx"
|
||||
sheet_name = "Assets"
|
||||
postcode_column = 'Postcode'
|
||||
fulladdress_column = "Address"
|
||||
address1_column = None
|
||||
address1_method = "house_number_extraction"
|
||||
address_cols_to_concat = []
|
||||
missing_postcodes_method = None
|
||||
landlord_year_built = "Build Year"
|
||||
landlord_os_uprn = None
|
||||
landlord_property_type = "Property Archetype"
|
||||
landlord_built_form = None
|
||||
landlord_wall_construction = None
|
||||
landlord_heating_system = "Heating Fuel Type"
|
||||
landlord_existing_pv = None
|
||||
landlord_property_id = "Uprn - DO NOT DELETE"
|
||||
outcomes_filename = "RT - LiveWest.xlsx"
|
||||
outcomes_sheetname = "Feedback"
|
||||
outcomes_postcode = "Poscode"
|
||||
outcomes_houseno = "No."
|
||||
outcomes_id = "UPRN"
|
||||
master_filepaths = [
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/Programme Update - March 2025/Rolling Master "
|
||||
"- redacted for analysis/CAVITY-Table 1.csv"
|
||||
]
|
||||
master_to_asset_list_filepath = None
|
||||
|
||||
# Live West (South West asset list)
|
||||
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/Programme Update - March "
|
||||
"2025/Livewest Asset List (Original) - csv")
|
||||
data_filename = "Report-Table 1.csv"
|
||||
sheet_name = None
|
||||
postcode_column = 'Postcode'
|
||||
fulladdress_column = "T1_Address"
|
||||
address1_column = None
|
||||
address1_method = "house_number_extraction"
|
||||
address_cols_to_concat = []
|
||||
missing_postcodes_method = None
|
||||
landlord_year_built = "Build Yr"
|
||||
landlord_os_uprn = None
|
||||
landlord_property_type = "T1_AssetType"
|
||||
landlord_built_form = "T1_AssetType"
|
||||
landlord_wall_construction = "Wall Type Cavity"
|
||||
landlord_heating_system = "Heating Fuel"
|
||||
landlord_existing_pv = None
|
||||
landlord_property_id = "T1_UPRN"
|
||||
outcomes_filename = "RT - LiveWest.xlsx"
|
||||
outcomes_sheetname = "Feedback"
|
||||
outcomes_postcode = "Poscode"
|
||||
outcomes_houseno = "No."
|
||||
outcomes_id = "UPRN"
|
||||
master_filepaths = [
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/Programme Update - March 2025/Rolling Master "
|
||||
"- redacted for analysis/CAVITY-Table 1.csv"
|
||||
]
|
||||
master_to_asset_list_filepath = None
|
||||
|
||||
# PFP East
|
||||
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Places For People/East"
|
||||
data_filename = "PFP EAST - Master - DN LN NG NR PE POSTCODES.xlsx"
|
||||
|
|
@ -218,6 +279,7 @@ def app():
|
|||
# landlord_year_built = "YEAR BUILT"
|
||||
# landlord_os_uprn = None
|
||||
# landlord_property_type = "Property type"
|
||||
# landlord_built_form = None
|
||||
# landlord_wall_construction = "Wall Constuction"
|
||||
# landlord_heating_system = "Heating"
|
||||
# landlord_existing_pv = None
|
||||
|
|
@ -325,7 +387,8 @@ def app():
|
|||
outcomes_filepath=os.path.join(data_folder, outcomes_filename) if outcomes_filename else None,
|
||||
outcomes_sheetname=outcomes_sheetname,
|
||||
outcomes_postcode=outcomes_postcode,
|
||||
outcomes_houseno=outcomes_houseno
|
||||
outcomes_houseno=outcomes_houseno,
|
||||
outcomes_id=outcomes_id
|
||||
)
|
||||
|
||||
asset_list.flag_survey_master(
|
||||
|
|
@ -340,7 +403,7 @@ def app():
|
|||
epc_api_only = False
|
||||
force_retrieve_data = False
|
||||
skip = None # Used to skip already completed chunks
|
||||
chunk_size = 5000
|
||||
chunk_size = 2000
|
||||
filename = "Chunk {i}.csv"
|
||||
download_folder = os.path.join(data_folder, "Chunks")
|
||||
if not os.path.exists(download_folder):
|
||||
|
|
@ -355,6 +418,8 @@ def app():
|
|||
if all(x in folder_contents for x in downloaded_files):
|
||||
skip = max(chunk_indexes)
|
||||
|
||||
# folder_contents = [f for f in folder_contents if "nodata" not in f and f.endswith(".csv")]
|
||||
|
||||
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
|
||||
print(f"Processing chunk {i} to {i + chunk_size}")
|
||||
if skip is not None and not force_retrieve_data:
|
||||
|
|
@ -418,8 +483,6 @@ def app():
|
|||
epc_df = pd.concat(epc_data)
|
||||
epc_df["estimated"] = epc_df["estimated"].fillna(False)
|
||||
|
||||
epc_df["number-habitable-rooms"].mean() + 1
|
||||
|
||||
# We expand out the recommendations
|
||||
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
|
||||
|
||||
|
|
|
|||
|
|
@ -40,5 +40,20 @@ BUILT_FORM_MAPPINGS = {
|
|||
'House': 'unknown',
|
||||
'Second Floor Flat': 'mid-floor',
|
||||
'First Floor Flat': 'ground floor',
|
||||
'Room Only': 'unknown'
|
||||
'Room Only': 'unknown',
|
||||
|
||||
'End Terrace Housex': 'end-terrace',
|
||||
'Mid Terrace Bungalow': 'mid-terrace',
|
||||
'End Terrace Bungalow': 'end-terrace',
|
||||
'Mid Terrace House': 'mid-terrace',
|
||||
'Detached Bungalow': 'detached',
|
||||
'End Terrace House': 'end-terrace',
|
||||
'Mid Terrace Housekeeping ': 'mid-terrace',
|
||||
'Semi Detached Bung': 'semi-detached',
|
||||
'Guest Room': 'unknown',
|
||||
'Coach House': 'detached',
|
||||
'Office Buildings': 'unknown',
|
||||
'Maisonnette': 'mid-floor',
|
||||
'Bedspace': 'unknown'
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,12 @@ STANDARD_HEATING_SYSTEMS = {
|
|||
"unknown",
|
||||
"communal gas boiler",
|
||||
"high heat retention storage heaters",
|
||||
"room heaters"
|
||||
"room heaters",
|
||||
'electric fuel',
|
||||
'oil fuel',
|
||||
'solid fuel',
|
||||
'gas combi boiler',
|
||||
'unknown'
|
||||
}
|
||||
|
||||
HEATING_MAPPINGS = {
|
||||
|
|
@ -106,5 +111,16 @@ HEATING_MAPPINGS = {
|
|||
'Quantum storage heaters (Old SH on EPC)': 'high heat retention storage heaters',
|
||||
'Quantum storage heaters': 'high heat retention storage heaters',
|
||||
'Air Source (EPC says SH)': 'air source heat pump',
|
||||
'ASHP - Was logged as oil': 'air source heat pump'
|
||||
'ASHP - Was logged as oil': 'air source heat pump',
|
||||
'Ground Source': 'ground source heat pump',
|
||||
'District Heating': 'district heating',
|
||||
'Mains Gas (Communal)': 'communal gas boiler',
|
||||
'LPG': 'boiler - other fuel',
|
||||
'Mains Gas': 'gas condensing boiler',
|
||||
|
||||
'ELECTRIC': 'electric fuel',
|
||||
'OIL': 'oil fuel',
|
||||
'SOLID FUEL': 'solid fuel',
|
||||
'GAS': 'gas combi boiler',
|
||||
'DO NOT SURVEY': 'unknown'
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,5 +92,28 @@ PROPERTY_MAPPING = {
|
|||
'Guest room in a complex': 'other',
|
||||
'PIMSS EMPTY': 'bedsit',
|
||||
'Room Only': 'other',
|
||||
'Detached Property': 'house'
|
||||
'Detached Property': 'house',
|
||||
'End Terrace Housex': 'house',
|
||||
'Coach House': 'coach house',
|
||||
'Mid Terrace Bungalow': 'bungalow',
|
||||
'End Terrace Bungalow': 'bungalow',
|
||||
'Mid Terrace House': 'house',
|
||||
'Detached Bungalow': 'bungalow',
|
||||
'End Terrace House': 'house',
|
||||
'Mid Terrace Housekeeping ': 'house',
|
||||
'Maisonnette': 'maisonette',
|
||||
'Guest Room': 'unknown',
|
||||
'Office Buildings': 'unknown',
|
||||
'Semi Detached Bung': 'bungalow',
|
||||
'Bedspace': 'bedsit',
|
||||
|
||||
'Houses/Bungalows': 'bungalow',
|
||||
'Bedsits': 'bedsit',
|
||||
'Unknown': 'unknown',
|
||||
'Sheltered Flats/besits': 'flat',
|
||||
'House/Bungalow ': 'bungalow',
|
||||
'Low/Med Rise Flats/Mais': 'flat',
|
||||
'Staff/Comm': 'other',
|
||||
'A Rooms': 'other'
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,5 +135,6 @@ WALL_CONSTRUCTION_MAPPINGS = {
|
|||
'Solid brick EWI installed': 'insulated solid brick',
|
||||
'Cavity Cavity batts': 'filled cavity',
|
||||
'Cavity CWI Completed by Dyson': 'filled cavity',
|
||||
None: "unknown"
|
||||
None: "unknown",
|
||||
"Cavity": "cavity unknown insulation",
|
||||
}
|
||||
|
|
|
|||
|
|
@ -207,12 +207,12 @@ class SearchEpc:
|
|||
|
||||
try:
|
||||
# Updated regex to catch house numbers including alphanumeric ones
|
||||
pattern = r'(?i)(?:flat|apartment)\s*(\d+\w*)|^\s*(\d+\w*)'
|
||||
pattern = r'(?i)(?:flat|apartment|room)\s*(\d+\w*)|^\s*(\d+\w*)'
|
||||
match1 = re.search(pattern, address)
|
||||
if match1:
|
||||
return next(g for g in match1.groups() if g is not None)
|
||||
|
||||
pattern2 = r'(?i)(flat|apartment)\s*([a-zA-Z]?\d+[a-zA-Z]?)'
|
||||
pattern2 = r'(?i)(flat|apartment|room)\s*([a-zA-Z]?\d+[a-zA-Z]?)'
|
||||
match2 = re.search(pattern2, address)
|
||||
if match2:
|
||||
return match2.group(2)
|
||||
|
|
@ -226,8 +226,8 @@ class SearchEpc:
|
|||
continue
|
||||
if part == postcode.split(" ")[1]:
|
||||
continue
|
||||
return part.rstrip(
|
||||
",") # This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
|
||||
return part.rstrip(",")
|
||||
# This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
|
||||
# number
|
||||
|
||||
# Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ def app():
|
|||
)
|
||||
|
||||
property_asset_data["is_pitched"] = property_asset_data["roof"].str.contains("pitched", case=False)
|
||||
property_asset_data["pre_2002"] = property_asset_data["BUILD_YEAR"] < 2002
|
||||
property_asset_data["pre_1970"] = property_asset_data["BUILD_YEAR"] < 1970
|
||||
property_asset_data["wall_type"] = property_asset_data["walls"].str.split(" ").str[0].str.strip()
|
||||
property_asset_data["is_insulated"] = (
|
||||
property_asset_data["walls"].str.split(",").str[1].str.strip().isin(
|
||||
|
|
@ -111,11 +111,11 @@ def app():
|
|||
property_asset_data["is_pitched"] = np.where(
|
||||
property_asset_data["is_pitched"], "Pitched roof", "Not Pitched Roof"
|
||||
)
|
||||
property_asset_data["pre_2002"] = np.where(
|
||||
property_asset_data["pre_2002"], "Pre 2002", "Post 2002"
|
||||
property_asset_data["pre_1970"] = np.where(
|
||||
property_asset_data["pre_1970"], "Pre 1970", "Post 1970"
|
||||
)
|
||||
|
||||
archetype_variables = ["property_type", "wall_type", "is_insulated", "is_pitched", "pre_2002"]
|
||||
archetype_variables = ["property_type", "wall_type", "is_insulated", "is_pitched", "pre_1970"]
|
||||
|
||||
assigned_archetypes = (
|
||||
property_asset_data.groupby(
|
||||
|
|
@ -129,8 +129,8 @@ def app():
|
|||
)
|
||||
|
||||
# Most prominent archetypes
|
||||
prominent_archetypes = assigned_archetypes.head(3)
|
||||
other_archetypes = assigned_archetypes.tail(-3)
|
||||
prominent_archetypes = assigned_archetypes.head(6)
|
||||
other_archetypes = assigned_archetypes.tail(-6)
|
||||
# 2 or fewer properties in the other archetypes
|
||||
|
||||
property_asset_data = property_asset_data.merge(
|
||||
|
|
@ -195,6 +195,13 @@ def app():
|
|||
reset_index()
|
||||
.rename(columns={"archetype_group": "Archetype"})
|
||||
)
|
||||
property_types = (
|
||||
(property_asset_data["property_type"] + ": " + property_asset_data["built_form"]).
|
||||
value_counts().
|
||||
to_frame().
|
||||
reset_index()
|
||||
.rename(columns={"index": "Property Type", 0: "Count"})
|
||||
)
|
||||
|
||||
# epc breakdown
|
||||
epc_breakdown = (
|
||||
|
|
@ -345,6 +352,11 @@ def app():
|
|||
df["relative_kwh_savings"] = df["kwh_savings"] / df["current_energy_demand"]
|
||||
df["relative_bill_savings"] = df["energy_cost_savings"] / df["bills_total_cost"]
|
||||
|
||||
# Add on the archetype
|
||||
df = df.merge(
|
||||
property_asset_data[["uprn", "archetype_group"]], how="left", left_on="UPRN", right_on="uprn"
|
||||
)
|
||||
|
||||
# For properties that don't make it to EPC B, check why. E.g. for a property that has an oil boiler, it
|
||||
# the bills go up recommending HHRSH, so it doesn't make it to EPC B
|
||||
# For mid-terrace units, use the ordnance survey API to check if there is space for a heat pump?
|
||||
|
|
@ -451,8 +463,190 @@ def app():
|
|||
pprint(scenario_metrics[scenario_ids[0]])
|
||||
pprint(scenario_metrics[scenario_ids[1]])
|
||||
|
||||
# TODO: Add a slide on valuation improvement, on a sample of properties?
|
||||
scenario_data[scenario_ids[0]]["loft_insulation"][
|
||||
scenario_data[scenario_ids[0]]["loft_insulation"] > 0
|
||||
].mean()
|
||||
|
||||
# TODO: Read in costing data and breakdown
|
||||
scenario_data[scenario_ids[0]]["cavity_wall_insulation"][
|
||||
scenario_data[scenario_ids[0]]["cavity_wall_insulation"] > 0
|
||||
].mean()
|
||||
|
||||
zz = scenario_recommendations_df[scenario_recommendations_df["type"] == "mechanical_ventilation"]
|
||||
# Testing checking floor risk
|
||||
|
||||
import requests
|
||||
|
||||
def get_flood_risk(lat, lon, radius_km=1):
|
||||
url = "https://environment.data.gov.uk/flood-monitoring/id/floods"
|
||||
params = {
|
||||
'lat': lat,
|
||||
'long': lon,
|
||||
'dist': radius_km # search radius in km
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
flood_warnings = data.get("items", [])
|
||||
|
||||
if not flood_warnings:
|
||||
print("No active flood warnings near this location.")
|
||||
else:
|
||||
print(f"{len(flood_warnings)} warning(s) found near the location:")
|
||||
for warning in flood_warnings:
|
||||
print(f"- Area: {warning.get('description')}")
|
||||
print(f" Severity: {warning.get('severity')} (Level {warning.get('severityLevel')})")
|
||||
print(f" Message changed at: {warning.get('timeMessageChanged')}")
|
||||
print()
|
||||
|
||||
return flood_warnings
|
||||
|
||||
from shapely.geometry import shape, Point
|
||||
def get_flood_areas_near_point(lat, lon, radius_km=2):
|
||||
url = "https://environment.data.gov.uk/flood-monitoring/id/floodAreas"
|
||||
params = {
|
||||
'lat': lat,
|
||||
'long': lon,
|
||||
'dist': radius_km
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json().get("items", [])
|
||||
|
||||
def point_in_flood_area(lat, lon):
|
||||
flood_areas = get_flood_areas_near_point(lat, lon, radius_km=1)
|
||||
point = Point(lon, lat) # GeoJSON uses (lon, lat) format
|
||||
|
||||
for area in flood_areas:
|
||||
polygon_url = area.get("polygon")
|
||||
if not polygon_url:
|
||||
continue
|
||||
|
||||
polygon_response = requests.get(polygon_url)
|
||||
polygon_response.raise_for_status()
|
||||
polygon_geojson = polygon_response.json()
|
||||
|
||||
features = polygon_geojson.get("features", [])
|
||||
if not features:
|
||||
continue
|
||||
|
||||
flood_polygon = shape(features[0]['geometry'])
|
||||
|
||||
try:
|
||||
is_inside = flood_polygon.contains(point)
|
||||
except:
|
||||
is_inside = False
|
||||
|
||||
if is_inside:
|
||||
print(f"📍 Point is inside flood area: {area['label']} ({area['notation']})")
|
||||
return area
|
||||
|
||||
from tqdm import tqdm
|
||||
floor_warnings_data = []
|
||||
for _, property in tqdm(property_asset_data.iterrows(), total=len(property_asset_data)):
|
||||
# warnings = floor_warnings_data.extend(
|
||||
# get_flood_risk(lat=property["LATITUDE"], lon=property["LONGITUDE"], radius_km=1)
|
||||
# )
|
||||
|
||||
resp = point_in_flood_area(lat=property["LATITUDE"], lon=property["LONGITUDE"])
|
||||
if resp:
|
||||
floor_warnings_data.append(
|
||||
{
|
||||
"uprn": property["uprn"],
|
||||
"address": property["address"],
|
||||
"postcode": property["postcode"],
|
||||
"area": resp
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
import plotly.graph_objects as go
|
||||
|
||||
labels = [
|
||||
"House_Cavity_Insulated_Pitched roof_Pre 1970",
|
||||
"House_Cavity_Insulated_Pitched roof_Post 1970",
|
||||
"House_Cavity_Uninsulated_Pitched roof_Pre 1970",
|
||||
"House_Cavity_Uninsulated_Pitched roof_Post 1970",
|
||||
"other",
|
||||
"House_System_Uninsulated_Pitched roof_Pre 1970",
|
||||
"House_Solid_Uninsulated_Not Pitched Roof_Pre 1970"
|
||||
]
|
||||
|
||||
values = [62, 36, 21, 16, 16, 4, 2]
|
||||
|
||||
hovertext = [
|
||||
"Loft insulation, draft proofing",
|
||||
"Top-up loft insulation",
|
||||
"Cavity wall insulation, loft insulation",
|
||||
"Cavity wall insulation, ventilation",
|
||||
"Bespoke retrofit measures",
|
||||
"External wall insulation, roof insulation",
|
||||
"Flat roof insulation, internal wall insulation"
|
||||
]
|
||||
|
||||
fig = go.Figure(go.Treemap(
|
||||
labels=labels,
|
||||
parents=[""] * len(labels), # No root
|
||||
values=values,
|
||||
hovertext=hovertext,
|
||||
hoverinfo="text",
|
||||
textinfo="none",
|
||||
marker=dict(
|
||||
line=dict(color="white", width=4),
|
||||
colors=values,
|
||||
colorscale="Blues"
|
||||
)
|
||||
))
|
||||
|
||||
fig.update_layout(
|
||||
margin=dict(t=10, l=10, r=10, b=10),
|
||||
plot_bgcolor="white",
|
||||
paper_bgcolor="white"
|
||||
)
|
||||
|
||||
fig.show()
|
||||
|
||||
# Get the recommended measures by scenario id
|
||||
recommendation_cols = [c for c in scenario_data[scenario_ids[1]].columns if "Recommendation:" in c]
|
||||
measure_counts_by_scenario = scenario_data[scenario_ids[1]].groupby("archetype_group")[
|
||||
recommendation_cols
|
||||
].sum().reset_index()
|
||||
|
||||
measure_counts_by_scenario.to_csv(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/measure_counts_by_scenario.csv"
|
||||
)
|
||||
|
||||
# Estimate average valuation improvment by scenarios
|
||||
valuation_data = pd.read_csv(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/property_valuation.csv"
|
||||
)
|
||||
|
||||
from backend.ml_models.Valuation import PropertyValuation
|
||||
|
||||
uplift = []
|
||||
for _, x in valuation_data.iterrows():
|
||||
uprn = x["uprn"]
|
||||
|
||||
to_append = {"uprn": uprn}
|
||||
for _id in scenario_ids:
|
||||
scenario = scenario_data[_id][
|
||||
scenario_data[_id]["uprn"] == uprn
|
||||
].squeeze()
|
||||
|
||||
val = PropertyValuation.estimate_valuation_improvement(
|
||||
current_value=x["valuation"],
|
||||
current_epc=scenario["Current EPC Rating"].value,
|
||||
target_epc=scenario["Predicted Post Works EPC"],
|
||||
total_cost=None
|
||||
)
|
||||
|
||||
to_append[_id] = val["average_increase"]
|
||||
|
||||
uplift.append(to_append)
|
||||
|
||||
uplift = pd.DataFrame(uplift)
|
||||
print(uplift[scenario_ids[0]].mean())
|
||||
# £8,161
|
||||
print(uplift[scenario_ids[1]].mean())
|
||||
# £16,938
|
||||
|
|
|
|||
76
etl/customers/mod/pilot/3. Past Project Costs.py
Normal file
76
etl/customers/mod/pilot/3. Past Project Costs.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
import pandas as pd
|
||||
|
||||
# Get the wave 2 costing data and produce some breakdowns
|
||||
costs = pd.read_excel(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/Measure cost study for MOD.xlsx",
|
||||
header=2
|
||||
)
|
||||
|
||||
# Get the EPC data for these
|
||||
|
||||
|
||||
# Cavity
|
||||
cwi_costs = costs[
|
||||
['Model', 'Total invoiced (including VAT)']
|
||||
].copy()
|
||||
cwi_costs["Model"] = "CWI - " + cwi_costs["Model"]
|
||||
cwi_costs = cwi_costs[~pd.isnull(cwi_costs["Total invoiced (including VAT)"])]
|
||||
|
||||
# Loft
|
||||
li_costs = costs[
|
||||
['Model.2', 'Total invoiced (including VAT).2']
|
||||
].copy()
|
||||
li_costs["Model.2"] = "LI - " + li_costs["Model.2"]
|
||||
li_costs = li_costs[~pd.isnull(li_costs["Total invoiced (including VAT).2"])]
|
||||
# Rename
|
||||
li_costs.columns = ["Model", "Total invoiced (including VAT)"]
|
||||
|
||||
# Windows
|
||||
windows_costs = costs[
|
||||
['Model.3', 'Total invoiced (including VAT).3']
|
||||
].copy()
|
||||
windows_costs["Model.3"] = "Windows - " + windows_costs["Model.3"]
|
||||
windows_costs = windows_costs[~pd.isnull(windows_costs["Total invoiced (including VAT).3"])]
|
||||
# Rename
|
||||
windows_costs.columns = ["Model", "Total invoiced (including VAT)"]
|
||||
|
||||
# Doors
|
||||
doors_costs = costs[
|
||||
['Model.4', 'Total invoiced (including VAT).4']
|
||||
].copy()
|
||||
doors_costs["Model.4"] = "Doors - " + doors_costs["Model.4"]
|
||||
doors_costs = doors_costs[~pd.isnull(doors_costs["Total invoiced (including VAT).4"])]
|
||||
# Rename
|
||||
doors_costs.columns = ["Model", "Total invoiced (including VAT)"]
|
||||
|
||||
# ASHP
|
||||
ashps_costs = costs[
|
||||
['Model.5', 'Total invoiced (including VAT).5']
|
||||
].copy()
|
||||
ashps_costs["Model.5"] = "ASHP - " + ashps_costs["Model.5"]
|
||||
ashps_costs = ashps_costs[~pd.isnull(ashps_costs["Total invoiced (including VAT).5"])]
|
||||
# Rename
|
||||
ashps_costs.columns = ["Model", "Total invoiced (including VAT)"]
|
||||
|
||||
# Solar
|
||||
solar_costs = costs[
|
||||
['Model.6', 'Total invoiced (including VAT).6']
|
||||
].copy()
|
||||
solar_costs["Model.6"] = "Solar - " + solar_costs["Model.6"]
|
||||
solar_costs = solar_costs[~pd.isnull(solar_costs["Total invoiced (including VAT).6"])]
|
||||
# Rename
|
||||
solar_costs.columns = ["Model", "Total invoiced (including VAT)"]
|
||||
|
||||
fabric_costing_data = pd.concat([cwi_costs, li_costs])
|
||||
windows_doors_costing_data = pd.concat([windows_costs, doors_costs])
|
||||
|
||||
windows_doors_costing_data.to_csv(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/windows_doors_costs.csv"
|
||||
)
|
||||
fabric_costing_data.to_csv(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/fabric_costing_data.csv"
|
||||
)
|
||||
ashps_costs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/ashps_costs.csv")
|
||||
solar_costs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme/solar_costs.csv")
|
||||
|
||||
project_cost_by_age = costs[["Property age ", "TOTAL Cost of Works"]].groupby("Property age ").mean().reset_index()
|
||||
Loading…
Add table
Reference in a new issue