working on wft sales analysis

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-09 10:36:30 +00:00
parent b4d4c2128b
commit f78078384b
11 changed files with 261 additions and 99 deletions

View file

@ -150,7 +150,7 @@ class Property(Definitions):
"""
solar_pv = self.data["photo-supply"]
if solar_pv == "":
if solar_pv in ["", None]:
solar_pv = None
else:
solar_pv = float(solar_pv)
@ -170,6 +170,7 @@ class Property(Definitions):
"Y": True,
"N": False,
"": None,
None: None,
}
self.solar_hot_water = {
@ -245,8 +246,8 @@ class Property(Definitions):
# it
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
if self.data["property-type"] in ["Flat", "Maisonette"]:
self.data["built-form"] = "End-Terrace"
self.set_year_built()
self.set_energy()
@ -394,7 +395,8 @@ class Property(Definitions):
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
"heated corridor": False,
None: False
}
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
@ -403,7 +405,7 @@ class Property(Definitions):
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
length = self.data["unheated-corridor-length"]
if length == "":
if length in ["", None]:
length = None
else:
length = float(length)
@ -579,7 +581,7 @@ class Property(Definitions):
self.floor_area = float(self.data["total-floor-area"])
if not self.data["number-habitable-rooms"] or (
self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
self.data["floor-height"] in ["", None] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
):
if self.property_dimensions is None:
property_dimensions = read_dataframe_from_s3_parquet(
@ -601,7 +603,7 @@ class Property(Definitions):
else:
raise NotImplementedError("Implement me")
if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
if self.data["floor-height"] in [None, ""] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.floor_height = float(self.data["floor-height"])
@ -626,7 +628,7 @@ class Property(Definitions):
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None
self.data["floor-level"] not in list(self.DATA_ANOMALY_MATCHES) + [None] else None
)
if self.floor_level is None:
@ -794,7 +796,7 @@ class Property(Definitions):
:return:
"""
if self.data["fixed-lighting-outlets-count"] == "":
if self.data["fixed-lighting-outlets-count"] in [None, ""]:
# We check old EPCs and the full SAP EPC

View file

@ -146,6 +146,7 @@ class SearchEpc:
max_retries: int = None,
uprn: [int, None] = None,
size=None,
property_type=None,
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -157,6 +158,7 @@ class SearchEpc:
:param uprn: int, optional, the uprn of the property
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
default
:param property_type: str, optional, the property type of the property, if known before hand
"""
self.address1 = address1
@ -184,6 +186,8 @@ class SearchEpc:
self.size = size if size is not None else 25
self.property_type = property_type
@classmethod
def get_house_number(cls, address: str) -> str | None:
"""
@ -335,7 +339,7 @@ class SearchEpc:
return address, postcode
def extract_epc_data(self, property_type=None, address=None):
def extract_epc_data(self, address=None):
"""
Given a successful search, this method will format the data and return it
@ -351,7 +355,7 @@ class SearchEpc:
# Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the
# property further
rows = self.filter_rows(rows, property_type=property_type, address=None)
rows = self.filter_rows(rows, property_type=self.property_type, address=None)
rows = self.filter_rows(rows, property_type=None, address=address)
# We now check for a full sap epc:
@ -366,9 +370,19 @@ class SearchEpc:
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
if len(uprns) != 1:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop()
# We can sometimes have no uprn for a property
if (len(uprns) == 0) and len(rows) > 0:
logger.warning("Found data but missing uprn")
elif len(uprns) != 1:
# There is a possibility that we have multiple UPRNs for a single property, which is an error
addresses = {r["address"] for r in rows}
if len(addresses) == 1:
# Take the uprn from the most recent
uprns = {newest_epc["uprn"]}
else:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop() if uprns else None
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
@ -670,6 +684,19 @@ class SearchEpc:
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
if skip_os:
if self.ordnance_survey_client.property_type is not None:
# We can try and estimate
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form
)
self.newest_epc = estimated_epc
self.older_epcs = []
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = self.ordnance_survey_client.address_os
self.postcode_clean = self.ordnance_survey_client.postcode_os
return
os_response = self.ordnance_survey_client.get_places_api()

View file

@ -333,7 +333,8 @@ class Eligibility:
"""
current_sap = int(self.epc["current-energy-efficiency"])
if current_sap > 54:
if current_sap >= 69:
self.eco4_warmfront = {
"eligible": False,
"message": "sap too high"
@ -347,7 +348,12 @@ class Eligibility:
is_eligible = self.cavity["suitability"] & self.loft["suitability"]
if post_retrofit_sap is None:
message = "subject to post retrofit sap" if is_eligible else "not eligible"
if current_sap >= 55:
message = "Possibly eligible but property currently EPC D"
else:
message = "subject to post retrofit sap" if is_eligible else "not eligible"
self.eco4_warmfront = {
"eligible": is_eligible,
"message": message

View file

@ -246,6 +246,8 @@ def merge_ha_15(asset_list, identified_addresses):
identified_addresses = identified_addresses.drop_duplicates("merge_key")
# We pull out raw counts for the survey lists
# Check asset list for dupes
asset_list_dupes = asset_list["merge_key"].duplicated()
if asset_list_dupes.sum():
@ -336,7 +338,8 @@ def merge_ha_15(asset_list, identified_addresses):
def prepare_model_data_row(
property_id, modelling_epc, cleaned, cleaning_data, created_at, old_data=None, full_sap_epc=None
property_id, modelling_epc, cleaned, cleaning_data, created_at,
photo_supply_lookup, floor_area_decile_thresholds, old_data=None, full_sap_epc=None,
):
"""
This function prepares the data for modelling, in the same fashion as the recommendation engine
@ -353,7 +356,8 @@ def prepare_model_data_row(
full_sap_epc=full_sap_epc
)
p.get_components(cleaned, None, None)
p.get_components(cleaned, photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),

View file

@ -1,6 +1,6 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
from pathlib import Path
from datetime import datetime
import pandas as pd
@ -8,7 +8,7 @@ import numpy as np
from utils.s3 import read_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from utils.s3 import read_dataframe_from_s3_parquet
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,10 +16,12 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
import re
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
logger = setup_logger()
load_dotenv(ENV_FILE)
@ -250,24 +252,55 @@ def load_data():
return data, survey_list
def get_epc_data(data, cleaned, cleaning_data, created_at):
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
property_type_lookup = {
'Semi Detached Bungalow': {"property-type": "Bungalow", "built-form": "Semi-Detached"},
'Mid Terraced House': {"property-type": "House", "built-form": "Mid-Terrace"},
'End Terraced House': {"property-type": "House", "built-form": "End-Terrace"},
'Low Rise Flat': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Semi-Detached House': {"property-type": "House", "built-form": "Semi-Detached"},
'Detached Bungalow': {"property-type": "Bungalow", "built-form": "Detached"},
'End Terraced Bungalow': {"property-type": "Bungalow", "built-form": "End-Terrace"},
'Mid Terraced Bungalow': {"property-type": "Bungalow", "built-form": "Mid-Terrace"},
'Medium Rise Flat': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Detached House': {"property-type": "House", "built-form": "Detached"},
'Cottage Flat': {"property-type": "Flat", "built-form": "Semi-Detached"},
'Maisonette Medium Rise': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Maisonette Over Shop': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'End Terraced Town House': {"property-type": "House", "built-form": "End-Terrace"},
'Flat Over Shop': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Mid Terraced Town House': {"property-type": "House", "built-form": "Mid-Terrace"},
}
for index, property_meta in tqdm(data.iterrows(), total=len(data)):
searcher = SearchEpc(
address1=property_meta["HouseNo"],
postcode=property_meta["Postcode"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["Address"]
)
searcher.search()
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["Type"]]["property-type"]
searcher.ordnance_survey_client.built_form = property_type_lookup[property_meta["Type"]]["built-form"]
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["Address"])
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
proxy_uprn = int(property_meta["row_id"].split("_")[1])
searcher.newest_epc["uprn"] = proxy_uprn
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
@ -277,16 +310,14 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
property_meta["warmfront_identified"]
):
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
older_epcs = [
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
]
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# Full checks
eligibility.check_gbis()
@ -303,7 +334,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -433,6 +466,18 @@ def analyse_results(results_df, data, survey_list):
how="left", on="survey_key"
)
all_identified_eco = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 A/W", "AFFORDABLE WARMTH"])) |
(analysis_data["eco4_eligible"])
]
all_identified_gbis = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 GBIS (ECO+)"])) |
(analysis_data["gbis_eligible"] & analysis_data["eco4_eligible"].isin([False, None]))
]
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
# Of the ECO jobs, what proportion to we get right
@ -482,17 +527,22 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_epc_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Store
# Old file was ha16.pickle
# import pickle
# with open("ha16.pickle", "wb") as f:
# with open("ha16_8_jan_2.pickle", "wb") as f:
# pickle.dump(
# {
# "scoring_data": scoring_data,
@ -500,3 +550,11 @@ def app():
# "nodata": nodata
# }, f
# )
# Read pickle
# import pickle
# with open("ha16.pickle", "rb") as f:
# saved = pickle.load(f)
# scoring_data = saved["scoring_data"]
# results_df = saved["results"]
# nodata = saved["nodata"]

View file

@ -1,14 +1,13 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
from pathlib import Path
from datetime import datetime
import pandas as pd
import numpy as np
from utils.s3 import read_from_s3
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,9 +15,9 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
import re
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
@ -170,24 +169,46 @@ def load_data():
return data, survey_list
def get_epc_data(data, cleaned, cleaning_data, created_at):
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
property_type_lookup = {
"01 HOUSE": "House",
"02 FLAT": "Flat",
"03 BUNGALOW": "Bungalow",
"05 BEDSIT": "Flat",
"04 MAISONETTE": "Maisonette",
"01 HOUSE MID": "House",
"10 PBUNGALOW": "Bungalow",
"14 SFLAT": "Flat",
"12 SBEDSIT": "Flat",
"11 PFLAT": "Flat",
"13 SBUNGALOW": "Bungalow",
" 01 HOUSE MID": "House",
"09 PBEDSIT": "Flat"
}
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
searcher = SearchEpc(
address1=property_meta["HouseNo"],
postcode=property_meta["Postcode"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["Address"]
)
searcher.search()
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["Property Type"]]
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["Address"])
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
@ -197,23 +218,25 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
property_meta["warmfront_identified"]
):
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
older_epcs = [
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
]
# older_epcs = [
# x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
# ]
# If this is the case, we need to update the older epcs
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# Full checks
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"]:
if eligibility.epc["uprn"] == "":
if eligibility.epc["uprn"] in ["", None]:
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
scoring_dictionary = prepare_model_data_row(
@ -223,7 +246,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -277,7 +302,7 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
scoring_df = DataProcessor.clean_efficiency_variables(scoring_df)
scoring_df["UPRN"] = scoring_df["UPRN"].astype(int)
model_api = ModelApi(portfolio_id="ha33-eligibility", timestamp=created_at)
model_api = ModelApi(portfolio_id="ha24-eligibility", timestamp=created_at)
all_predictions = model_api.predict_all(
df=scoring_df,
bucket="retrofit-data-dev",
@ -353,6 +378,18 @@ def analyse_results(results_df, data, survey_list):
how="left", on="survey_key"
)
all_identified_eco = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 A/W"])) |
(analysis_data["eco4_eligible"])
]
all_identified_gbis = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 GBIS (ECO+)"])) |
(analysis_data["gbis_eligible"] & analysis_data["eco4_eligible"].isin([False, None]))
]
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
# Of the ECO jobs, what proportion to we get right
@ -403,17 +440,21 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_epc_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Pickle results just in case
# import pickle
# with open("ha24.pickle", "wb") as f:
# with open("ha24_8_jan.pickle", "wb") as f:
# pickle.dump(
# {
# "scoring_data": scoring_data,
@ -421,3 +462,11 @@ def app():
# "nodata": nodata
# }, f
# )
# Read in pickle
# import pickle
# with open("ha24_8_jan.pickle", "rb") as f:
# saved = pickle.load(f)
# scoring_data = saved["scoring_data"]
# results_df = saved["results"]
# nodata = saved["nodata"]

View file

@ -16,6 +16,7 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
@ -67,12 +68,16 @@ def load_data():
return df
def get_ha7_data(data, cleaned, cleaning_data, created_at):
def get_ha7_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
property_type_lookup = {
"Mid Terrace": "Mid-Terrace",
"End Terrace": "End-Terrace",
"Semi Detached": "Semi-Detached",
"Detached": "Detached",
# "Mid Terrace": "Mid-Terrace",
# "End Terrace": "End-Terrace",
# "Semi Detached": "Semi-Detached",
# "Detached": "Detached",
"House": "House",
"Flat": "Flat",
"Bungalow": "Bungalow",
"Maisonette": "Maisonette",
}
scoring_data = []
@ -80,7 +85,7 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
nodata = []
for _, house in tqdm(data.iterrows(), total=len(data)):
if house["Address"] is not None:
if house["Address"]:
address = house["Address"]
else:
address = house["Address2"]
@ -89,7 +94,8 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
address1=address,
postcode=house["Postcode"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=None
os_api_key=None,
property_type=property_type_lookup.get(house["Archetype"]),
)
searcher.find_property(skip_os=True)
@ -118,7 +124,9 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -285,9 +293,13 @@ def app():
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_ha7_data(data, cleaned, cleaning_data, created_at)
results_df, scoring_data, nodata = get_ha7_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Pickle results
# import pickle

View file

@ -210,7 +210,20 @@ class SolarPhotoSupply:
]
if photo_supply_matched.empty:
raise ValueError("No photo supply matched")
# There are a small number of cases where we don't get a full match so try again with a more aggregated
# average
photo_supply_matched = photo_supply_lookup[
(photo_supply_lookup["tenure"] == tenure) &
(photo_supply_lookup["built_form"] == built_form) &
(photo_supply_lookup["property_type"] == property_type)
]
if construction_age_band in photo_supply_matched["construction_age_band"].values:
photo_supply_matched = photo_supply_matched[
photo_supply_matched["construction_age_band"] == construction_age_band
]
if photo_supply_matched.empty:
raise ValueError("No photo supply matches")
floor_area_decile = cls.classify_floor_area(
floor_area, floor_area_decile_thresholds["floor_area_decile_thresholds"].values

View file

@ -1,5 +1,4 @@
from backend.Property import Property
from unittest.mock import Mock
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.tests.test_data.materials import materials
@ -7,7 +6,7 @@ from recommendations.tests.test_data.materials import materials
class TestVentilationRecommendations:
def test_natural_ventilation(self):
input_property1 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.data = {"mechanical-ventilation": "natural"}
recommender = VentilationRecommendations(
@ -28,7 +27,7 @@ class TestVentilationRecommendations:
assert recommender.recommendation[0]["parts"][0]["quantity"] == 2
def test_missing_ventilation(self):
input_property2 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property2 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property2.data = {"mechanical-ventilation": None}
recommender2 = VentilationRecommendations(
@ -49,7 +48,7 @@ class TestVentilationRecommendations:
assert recommender2.recommendation[0]["parts"][0]["quantity"] == 2
def test_nodata_ventilation(self):
input_property3 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property3 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property3.data = {"mechanical-ventilation": "NO DATA!!"}
recommender3 = VentilationRecommendations(
@ -70,7 +69,7 @@ class TestVentilationRecommendations:
assert recommender3.recommendation[0]["parts"][0]["quantity"] == 2
def test_existing_ventilation_1(self):
input_property4 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property4 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property4.data = {"mechanical-ventilation": 'mechanical, extract only'}
recommender4 = VentilationRecommendations(
@ -86,7 +85,7 @@ class TestVentilationRecommendations:
assert recommender4.has_ventilaion
def test_existing_ventilation_2(self):
input_property5 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property5 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property5.data = {"mechanical-ventilation": 'mechanical, supply and extract'}
recommender5 = VentilationRecommendations(

View file

@ -231,7 +231,7 @@ class TestWallRecommendationsBase:
class TestCavityWallRecommensations:
def test_fill_empty_cavity(self):
input_property = Property(id=1, postcode="F4k3", address1="123 fake street", epc_client=Mock())
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
input_property.walls = {
'original_description': 'Cavity wall, as built, no insulation (assumed)',
'clean_description': 'Cavity wall, as built, no insulation',
@ -265,7 +265,7 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 2004.6600000000003)
def test_fill_partial_filled_cavity(self):
input_property = Property(id=1, postcode="F4k3", address1="123 fake street", epc_client=Mock())
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
input_property.walls = {
'original_description': 'Cavity wall, as built, partial insulation (assumed)',
'clean_description': 'Cavity wall, as built, partial insulation',
@ -299,7 +299,7 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 1999.9350000000002)
def test_system_built_wall(self):
input_property2 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property2 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property2.walls = {
'original_description': 'System built, as built, no insulation (assumed)',
'clean_description': 'System built, as built, no insulation',
@ -346,7 +346,7 @@ class TestCavityWallRecommensations:
assert recommender2.recommendations[6]["parts"][0]["depth"] == 52.5
def test_timber_frame_wall(self):
input_property3 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property3 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property3.walls = {
'original_description': 'Timber frame, as built, no insulation (assumed)',
'clean_description': 'Timber frame, as built, no insulation',
@ -388,7 +388,7 @@ class TestCavityWallRecommensations:
assert recommender3.recommendations[1]["parts"][0]["depth"] == 150.0
def test_granite_or_whinstone_wall(self):
input_property4 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property4 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property4.walls = {
'original_description': 'Granite or whinstone, as built, no insulation (assumed)',
'clean_description': 'Granite or whinstone, as built, no insulation',
@ -430,7 +430,7 @@ class TestCavityWallRecommensations:
assert recommender4.recommendations[1]["parts"][0]["depth"] == 150
def test_cob_wall(self):
input_property5 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property5 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property5.walls = {
'original_description': 'Cob, as built',
'clean_description': 'Cob, as built',
@ -472,7 +472,7 @@ class TestCavityWallRecommensations:
assert recommender5.recommendations[3]["parts"][0]["depth"] == 100
def test_sandstone_or_limestone_wall(self):
input_property6 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property6 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property6.walls = {
'original_description': 'Sandstone or limestone, as built, no insulation (assumed)',
'clean_description': 'Sandstone or limestone, as built, no insulation',

View file

@ -1,6 +1,5 @@
from recommendations.WindowsRecommendations import WindowsRecommendations
from backend.Property import Property
from unittest.mock import Mock
from recommendations.tests.test_data.materials import materials
@ -15,8 +14,7 @@ class TestWindowRecommendations:
property_1 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0
@ -52,8 +50,7 @@ class TestWindowRecommendations:
property_2 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 33
@ -86,8 +83,7 @@ class TestWindowRecommendations:
property_3 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80
@ -110,8 +106,7 @@ class TestWindowRecommendations:
property_4 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100
@ -134,8 +129,7 @@ class TestWindowRecommendations:
property_5 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 50
@ -164,8 +158,7 @@ class TestWindowRecommendations:
property_6 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0
@ -199,8 +192,7 @@ class TestWindowRecommendations:
property_7 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100
@ -227,11 +219,11 @@ class TestWindowRecommendations:
property_8 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80
"multi-glaze-proportion": 80,
"uprn": 1
}
)
property_8.windows = {'original_description': 'Mostly triple glazing', 'has_glazing': True,