Pulling together ha15 ha 32 scoring pipeline

This commit is contained in:
Khalim Conn-Kowlessar 2023-12-07 13:04:15 +00:00
parent 8402088b3f
commit e1c066ea8d
6 changed files with 142 additions and 32 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (model_data)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="ha_15_32_eligibility" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (model_data)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="ha_15_32_eligibility" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>

View file

@ -134,7 +134,7 @@ class SearchEpc:
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
return newest_epc, older_epcs
return newest_epc, older_epcs, full_sap_epc
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -142,9 +142,13 @@ class SearchEpc:
r for r in list_of_epcs if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs])
]
if len(newest_response) > 1:
if not newest_response:
return {}, []
if len(newest_response) != 1:
raise Exception("More than one result found for this address - investigate me")
older_epcs = [epc for epc in ["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
older_epcs = [epc for epc in list_of_epcs if epc["lmk-key"] != newest_response[0]["lmk-key"]]
return newest_response[0], older_epcs

View file

@ -1,3 +1,8 @@
from recommendations.recommendation_utils import convert_thickness_to_numeric
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
class MeasureSuitibility:
"""
Given the epc data about a property, this class holds the logic for determining if the home
@ -14,7 +19,65 @@ class MeasureSuitibility:
self.epc = epc
self.cleaned = cleaned
walls_description = self.epc["walls-description"]
# Get the cleaned version of the description
self.walls = self.parse_fabric("walls-description")
self.roof = self.parse_fabric("roof-description")
# def loft_insulation(self):
def parse_fabric(self, key):
if "thermal transmittance" in self.epc[key]:
if key == "walls-description":
return WallAttributes(self.epc["walls-description"]).process()
if key == "roof-description":
return RoofAttributes(self.epc["roof-description"]).process()
raise ValueError("Invalid Key")
# Get the cleaned version of the description
return [
data for data in self.cleaned[key] if
data["original_description"] == self.epc[key]
][0]
def loft_insulation(self, loft_thickness_threshold: int = None):
"""
Given the description of roof, this function determines whether or not the property is suitable for loft
insulation. A loft existing insulation with a thickness below loft_thickness_threshold, is deemed to
be suitable for loft insulation
:param loft_thickness_threshold: Integer, Optional. If provided, any loft found with insulation lower than
this thickness is deemed to be suitable for loft insulation. If this
parameter is not provided, this method will default to the variable specified
in LOFT_INSULATION_THRESHOLD
"""
loft_thickness_threshold = (
self.LOFT_INSULATION_THRESHOLD if loft_thickness_threshold is None else loft_thickness_threshold
)
# We firstly check if the roof is a loft
is_loft = self.roof["is_pitched"] and (not self.roof["is_roof_room"])
if not is_loft:
return {
"suitablility": False,
"thickness": None
}
# If it is a loft, we'll convert the textual thickenss to a numerical value we can easily use
insulation_thickness = convert_thickness_to_numeric(
string_thickness=self.roof["insulation_thickness"],
is_pitched=self.roof["is_pitched"],
is_flat=self.roof["is_flat"]
)
if insulation_thickness > loft_thickness_threshold:
# Insulation is already thick enough
return {
"suitablility": False,
"thickness": insulation_thickness
}
return {
"suitablility": True,
"thickness": insulation_thickness
}

View file

@ -13,6 +13,9 @@ from utils.s3 import read_from_s3
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from backend.Property import Property
from etl.eligibility.MeasureSuitibility import MeasureSuitibility
from etl.epc.DataProcessor import DataProcessor
from backend.app.utils import read_parquet_from_s3
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
@ -335,34 +338,68 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
# We want to retrieve EPCs for every single property
ha_data = ha32
house_number_key = "Dwelling num"
address_key = "Street"
postcode_key = "Postcode"
ha32_scoring_data = []
for _, house in tqdm(ha32.iterrows(), total=len(ha32)):
searcher = SearchEpc(
address1=" ".join([house["No."], house["Address"]]),
postcode=house["Postcode"]
)
def get_data(ha_data, house_number_key, address_key, postcode_key):
ha_scoring_data = []
for _, house in tqdm(ha_data.iterrows(), total=len(ha_data)):
searcher = SearchEpc(
address1=" ".join([house[house_number_key], house[address_key]]),
postcode=house[postcode_key]
)
searcher.search()
searcher.search()
newest_epc, older_epcs = searcher.retrieve()
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
newest_epc, older_epcs, _ = searcher.retrieve()
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
from etl.eligibility.MeasureSuitibility import MeasureSuitibility
suitability = MeasureSuitibility(
epc=newest_epc, cleaned=cleaned
)
suitability = MeasureSuitibility(
epc=newest_epc, cleaned=cleaned
)
suitable = suitability.loft_insulation()
from pprint import pprint
len(searcher.data["rows"])
modelling_epc = newest_epc.copy()
if not suitable["suitablility"]:
# if unsuccessful with newest EPC, try penultimate
suitability = MeasureSuitibility(
epc=penultimate_epc, cleaned=cleaned
)
suitable = suitability.loft_insulation()
modelling_epc = penultimate_epc.copy()
# TODO: Integegrate SearchEPC into the Property class
p = Property(
id=house["row_id"],
postcode=house["postcode"],
address1=house["address1"],
epc_client=None,
data=searcher.data
)
if not suitable["suitablility"]:
raise ValueError("DO SOMETHING")
p = Property(
id=house["row_id"],
postcode=modelling_epc["postcode"],
address1=modelling_epc["address1"],
epc_client=None,
data=modelling_epc
)
################################################################################
# Prepare the data for modelling, in the same fasion as the engine
################################################################################
p.get_components(cleaned)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
p.set_number_lighting_outlets(cleaned_property_data)
from pprint import pprint
len(searcher.data["rows"])

View file

@ -1,3 +1,9 @@
pandas
pydantic==1.10.11
epc-api-python==1.0.2
msgpack
tqdm
python-dotenv
boto3
textblob
pyarrow==12.0.1