implementing new sap model data prep into backend

This commit is contained in:
Khalim Conn-Kowlessar 2023-10-10 11:37:56 +08:00
parent efdef5eb46
commit de9810af43
7 changed files with 154 additions and 40 deletions

View file

@ -59,9 +59,11 @@ class Property(Definitions):
self.year_built = None
self.number_of_rooms = None
self.age_band = None
self.construction_age_band = None
self.number_of_floors = None
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy = None
self.ventilation = None
@ -298,6 +300,7 @@ class Property(Definitions):
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
self.set_wall_type()
self.set_floor_type()
def set_age_band(self):
"""
@ -308,8 +311,8 @@ class Property(Definitions):
if not self.data:
raise ValueError("Property does not contain data")
construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
self.age_band = england_wales_age_band_lookup.get(construction_age_band)
self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
def set_spatial(self, spatial: pd.DataFrame):
"""
@ -570,6 +573,13 @@ class Property(Definitions):
"""
self.wall_type = get_wall_type(**self.walls)
def set_floor_type(self):
"""
This method sets the floor type of the property, which is used for calculating u-values
:return:
"""
self.floor_type = "suspended" if self.floor["is_suspended"] else "solid"
@staticmethod
def _extract_component(component_data, component_rename_cols, component_drop_cols, rename_prefix=None):
for k in component_rename_cols:
@ -647,7 +657,7 @@ class Property(Definitions):
"CARBON": self.data["co2-emissions-current"],
"HEAT_DEMAND": self.data["energy-consumption-current"],
"estimated_perimeter": self.perimeter,
"CONSTRUCTION_AGE_BAND": self.age_band,
"CONSTRUCTION_AGE_BAND": self.construction_age_band,
"FLOOR_HEIGHT": self.floor_height,
"NUMBER_HABITABLE_ROOMS": self.number_of_rooms,
"TOTAL_FLOOR_AREA": self.floor_area,

View file

@ -156,6 +156,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
@ -174,6 +175,21 @@ async def trigger_plan(body: PlanTriggerRequest):
fixed_data=fixed_data,
)
fer
for col in scoring_dict.keys():
if col in [
"UPRN", "id", "LOCAL_AUTHORITY",
]:
continue
if col in ["SAP_STARTING", "HEAT_DEMAND_STARTING", "CARBON_STARTING", "FLOOR_HEIGHT_STARTING"]:
if scoring_dict[col]:
unique_vals = sap_change_dataset[col].unique()
if scoring_dict[col] not in unique_vals:
blah
recommendations_scoring_data.append(scoring_dict)
# cleanup

View file

@ -4,7 +4,7 @@ from collections import defaultdict
from utils.s3 import read_from_s3
from recommendations.config import UPGRADES_MAP
from recommendations.recommendation_utils import get_wall_u_value, get_floor_u_value
from recommendations.recommendation_utils import get_wall_u_value, get_floor_u_value, get_roof_u_value
from backend.app.db.utils import row2dict
from backend.app.config import get_settings
@ -86,19 +86,53 @@ def create_recommendation_scoring_data(
**fixed_data.to_dict("records")[0]
}
# Set staring u-values if we don't have them
if not scoring_dict["walls_thermal_transmittance"]:
scoring_dict["walls_thermal_transmittance"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if not scoring_dict["floor_thermal_transmittance"]:
scoring_dict["floor_thermal_transmittance"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if not scoring_dict["roof_thermal_transmittance"]:
scoring_dict["roof_thermal_transmittance"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
# Tidy up insulation thicknesses, making sure it isn't None
if scoring_dict["walls_insulation_thickness"] is None:
scoring_dict["walls_insulation_thickness"] = "none"
if scoring_dict["floor_insulation_thickness"] is None:
scoring_dict["floor_insulation_thickness"] = "none"
if scoring_dict["roof_insulation_thickness"] is None:
scoring_dict["floor_insulation_thickness"] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] == "wall_insulation":
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
# We may not have the u-value initially, so we calculate it
if not scoring_dict["walls_thermal_transmittance"]:
scoring_dict["walls_thermal_transmittance"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
scoring_dict["walls_thermal_transmittance_ENDING"] = get_wall_u_value(
clean_description=UPGRADES_MAP[property.walls["clean_description"]],
age_band=property.age_band,
@ -106,11 +140,64 @@ def create_recommendation_scoring_data(
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
scoring_dict["walls_insulation_thickness_ENDING"] = "above average"
elif recommendation["type"] == "floor_insulation":
blah
scoring_dict["FLOOR_DESCRIPTION_ENDING"] = UPGRADES_MAP[property.floor["clean_description"]]
else:
if not scoring_dict["walls_thermal_transmittance_ENDING"]:
scoring_dict["walls_thermal_transmittance_ENDING"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if scoring_dict["walls_insulation_thickness_ENDING"] is None:
scoring_dict["walls_insulation_thickness_ENDING"] = "none"
# Update description to indicate it's insulate
if recommendation["type"] == "floor_insulation":
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
scoring_dict["floor_thermal_transmittance_ENDING"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=recommendation["parts"][0]["depths"][0],
age_band=property.age_band,
)
scoring_dict["floor_insulation_thickness_ENDING"] = "above average"
else:
if not scoring_dict["floor_thermal_transmittance_ENDING"]:
scoring_dict["floor_thermal_transmittance_ENDING"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if scoring_dict["floor_insulation_thickness_ENDING"] is None:
scoring_dict["floor_insulation_thickness_ENDING"] = "none"
if recommendation["type"] not in ["wall_insulation", "floor_insulation"]:
raise NotImplementedError("Implement me")
if not scoring_dict["roof_thermal_transmittance_ENDING"]:
scoring_dict["roof_thermal_transmittance_ENDING"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
if scoring_dict["roof_insulation_thickness_ENDING"] is None:
scoring_dict["roof_insulation_thickness_ENDING"] = "none"
return scoring_dict

View file

@ -162,18 +162,6 @@ class DataProcessor:
break
to_index -= 1
def reformat_columns(self):
"""
This function applies the re-formattng of columns from lower case to capitalised
When requesting the epc data from the api, the columns are lower case
and separated by a hyphen, whereas in the bulk download, the columns
are capitalised and separated by underscores. If rename_columns is True
we convert the columns from lower case to capitalised format
:return:
"""
self.data.columns = [col.upper().replace("-", "_") for col in self.data.columns]
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
@ -181,22 +169,24 @@ class DataProcessor:
if self.data is None:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if self.newdata:
self.reformat_columns()
if not self.newdata:
self.confine_data()
self.remap_columns()
# We have some non-standard construction age bands which we'll clean for matching
self.standardise_construction_age_band()
self.clean_missing_rooms()
if not self.newdata:
self.standardise_construction_age_band()
self.clean_missing_rooms()
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
self.clean_multi_glaze_proportion()
if not self.newdata:
self.clean_multi_glaze_proportion()
self.clean_photo_supply()
if not self.newdata:
@ -208,16 +198,24 @@ class DataProcessor:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
if not self.newdata:
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Final re-casting after data transformed and prepared
self.data = self.data.astype(COLUMNTYPES)
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.newdata else COLUMNTYPES
self.data = self.data.astype(coltypes)
self.na_remapping()
return self.data
def na_remapping(self):
for column, fill_value in fill_na_map.items():
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if self.newdata else fill_na_map
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON):
@ -260,7 +258,8 @@ class DataProcessor:
data = data.replace(np.NAN, None)
# Remap certain columns
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
if not self.newdata:
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
convert_to_lower = ["TRANSACTION_TYPE"]

View file

@ -11,7 +11,6 @@ from etl.epc.settings import (
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
EARLIEST_EPC_DATE,
CARBON_RESPONSE,
)
from etl.epc.DataProcessor import DataProcessor

View file

@ -79,7 +79,7 @@ class FloorRecommendations(Definitions):
return
u_value = get_floor_u_value(
floor_type="suspended" if is_suspended else "solid",
floor_type=self.property.floor_type,
area=float(self.property.data["total-floor-area"]),
perimeter=self.property.perimeter,
age_band=self.property.age_band,

View file

@ -447,6 +447,9 @@ def extract_insulation_thickness(insulation_thickness_str):
if insulation_thickness_str in ["none", "average", "below average", "above average", None]:
return None
if isinstance(insulation_thickness_str, (float, int)):
return insulation_thickness_str
return int(insulation_thickness_str.replace("mm", ""))