diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index 26682044..3bd2c84e 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -2,6 +2,7 @@ import os import time from epc_api.client import EpcClient from utils.logger import setup_logger +from typing import List logger = setup_logger() @@ -105,3 +106,45 @@ class SearchEpc: "message": "Could not retrieve EPC data", "error": str(e) } + + def retrieve(self): + + """ + Given a successful search, this method will format the data and return it + :return: + """ + + if self.data is None: + raise ValueError("data is missing, run search first") + + rows = self.data["rows"] + + # We perform some checks on the rows + # Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the + # property further + uprns = {r["uprn"] for r in rows} + + if len(uprns) != 1: + raise NotImplementedError("More than one unique UPRN, need to handle this case") + + # We now check for a full sap epc: + full_sap_epc = [r for r in rows if r["transaction-type"] == "new dwelling"] + full_sap_epc = full_sap_epc[0] if full_sap_epc else {} + + # Finally, we identify the newest epc and the rest, and then return + newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows) + + return newest_epc, older_epcs + + @staticmethod + def filter_newest_epc(list_of_epcs: List): + newest_response = [ + r for r in list_of_epcs if + r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in list_of_epcs]) + ] + if len(newest_response) > 1: + raise Exception("More than one result found for this address - investigate me") + + older_epcs = [epc for epc in ["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]] + + return newest_response[0], older_epcs diff --git a/etl/eligibility/MeasureSuitibility.py b/etl/eligibility/MeasureSuitibility.py new file mode 100644 index 00000000..a9368f67 --- /dev/null +++ b/etl/eligibility/MeasureSuitibility.py @@ -0,0 +1,20 @@ +class MeasureSuitibility: + """ + Given the epc data about a property, this class holds the logic for determining if the home + is eligible for a specific retrofit measure. + + For example, this could be whether the loft has insulation below a standardised threshold, or + if it has an empty cavity + """ + + # If the loft has less than 100mm of insulation, we classify the home has needing loft insulation + LOFT_INSULATION_THRESHOLD = 100 + + def __init__(self, epc, cleaned): + self.epc = epc + self.cleaned = cleaned + + walls_description = self.epc["walls-description"] + # Get the cleaned version of the description + + # def loft_insulation(self): diff --git a/etl/eligibility/ha_15_32/app.py b/etl/eligibility/ha_15_32/app.py index d037d610..34204aad 100644 --- a/etl/eligibility/ha_15_32/app.py +++ b/etl/eligibility/ha_15_32/app.py @@ -4,11 +4,15 @@ used by the Warmfront team, to identify which properties are eligible for ECO4 a work is being done in December 2023, prior to completion of acquisition """ from pathlib import Path +from tqdm import tqdm import pandas as pd import numpy as np +import msgpack from utils.logger import setup_logger +from utils.s3 import read_from_s3 from dotenv import load_dotenv from backend.SearchEpc import SearchEpc +from backend.Property import Property ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env" @@ -321,3 +325,44 @@ def app(): ha32, _ = marge_ha_32(asset_list=ha32_asset_list, identified_addresses=ha32_identified_addresses) ha15, _ = merge_ha_15(asset_list=ha15_asset_list, identified_addresses=ha15_identified_addresses) + + ha32["row_id"] = ["h32" + str(i) for i in range(0, len(ha32))] + ha15["row_id"] = ["h15" + str(i) for i in range(0, len(ha15))] + + cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" + ) + cleaned = msgpack.unpackb(cleaned, raw=False) + + # We want to retrieve EPCs for every single property + + ha32_scoring_data = [] + for _, house in tqdm(ha32.iterrows(), total=len(ha32)): + searcher = SearchEpc( + address1=" ".join([house["No."], house["Address"]]), + postcode=house["Postcode"] + ) + + searcher.search() + + newest_epc, older_epcs = searcher.retrieve() + # We also want to get the penultimate epc + penultimate_epc, _ = searcher.filter_newest_epc(older_epcs) + + from etl.eligibility.MeasureSuitibility import MeasureSuitibility + suitability = MeasureSuitibility( + epc=newest_epc, cleaned=cleaned + ) + + from pprint import pprint + len(searcher.data["rows"]) + + # TODO: Integegrate SearchEPC into the Property class + p = Property( + id=house["row_id"], + postcode=house["postcode"], + address1=house["address1"], + epc_client=None, + data=searcher.data + )