From 0fad758fbbccba9acf08dd9d1bbcdbca2f5a23e1 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 23 Jan 2025 20:57:27 +0000 Subject: [PATCH] added unit tests --- backend/SearchEpc.py | 83 ++++++++++++++++------- backend/apis/GoogleSolarApi.py | 27 ++++++-- backend/app/plan/router.py | 31 +++++++-- backend/tests/test_search_epc.py | 50 ++++++++++++++ etl/customers/l_and_g/ic_slides.py | 16 +++-- etl/customers/remote_assessments/app.py | 17 +++-- etl/find_my_epc/AssetListEpcData.py | 1 + recommendations/Costs.py | 5 +- recommendations/SolarPvRecommendations.py | 8 ++- recommendations/county_to_region.py | 7 +- 10 files changed, 190 insertions(+), 55 deletions(-) create mode 100644 backend/tests/test_search_epc.py diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index d916f82f..c74a0b1f 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -139,8 +139,8 @@ class SearchEpc: } NODATA = { - "status": 201, - "message": "No data", + "status": 204, + "message": "no data", "error": None } @@ -155,7 +155,7 @@ class SearchEpc: uprn: [int, None] = None, size=None, property_type=None, - fast=False + fast=False, ): """ Address lines 1 and postcode are mandatory fields. The other address lines are optional @@ -248,14 +248,10 @@ class SearchEpc: else: return None - def get_epc(self, params=None, size=None): - # Get the EPC data with retries - size = size if size is not None else self.size - if params is None: - if self.uprn: - params = {"uprn": self.uprn} - else: - params = {"address": self.address1, "postcode": self.postcode} + def _get_epc(self, params, size): + """ + To be called by get_epc() - not for external usage + """ url = os.path.join(self.client.domestic.host, "search") if size: @@ -268,24 +264,20 @@ class SearchEpc: if response: self.data = response - return self.SUCCESS + return { + "response": response, + "msg": self.SUCCESS + } if retry > 0: logger.info("Failed previous attempt but retry successful") # If we got nothing, final try if not response: return { - "status": 204, - "message": "no data", - "error": None + "response": response, + "msg": self.NODATA } - return { - "status": 200, - "message": "success", - "error": None - } - except Exception as e: if retry < self.max_retries - 1: # If not the last retry, wait for 3 seconds before retrying @@ -293,11 +285,54 @@ class SearchEpc: else: # If it's the last retry, we continue return { - "status": 500, - "message": "Could not retrieve EPC data", - "error": str(e) + "response": {}, + "msg": { + "status": 500, + "message": "Could not retrieve EPC data", + "error": str(e) + } } + def get_epc(self, params=None, size=None): + # Get the EPC data with retries + size = size if size is not None else self.size + if params: + output = self._get_epc(params=params, size=size) + if output["msg"]["status"] == 200: + self.data = output["response"] + return output["msg"] + + uprn_params = {"uprn": self.uprn} if self.uprn else {} + address_params = {"address": self.address1, "postcode": self.postcode} + + # We attempt the search with uprn params + + data = {"rows": []} + if uprn_params: + api_response = self._get_epc(params=uprn_params, size=size) + if api_response["msg"]["status"] == 200: + data["rows"].extend(api_response["response"]["rows"]) + + # If we were unsuccessful, we then make a second attempt to fetch the data. We find that + # properties are sometimes listed under the wrong UPRN + api_response = self._get_epc(params=address_params, size=size) + if api_response["msg"]["status"] == 200: + # We update the data with the correct uprn + if self.uprn: + for x in api_response["response"]["rows"]: + x["uprn"] = self.uprn + + data["rows"].extend(api_response["response"]["rows"]) + + # We no de-dupe on lmk-key to avoid duplicates + seen = set() + data["rows"] = [ + row for row in data["rows"] + if row["lmk-key"] not in seen and not seen.add(row["lmk-key"]) + ] + + return api_response["msg"] + def filter_rows(self, rows, property_type=None, address=None): """ This method should not be used when property_type and address are both not None diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index e2b7d933..183503d5 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -51,6 +51,9 @@ class GoogleSolarApi: MIN_UNIT_PANELS = 4 # Minimum number of panels we allow for a domestic building MIN_BUILDING_PANELS = 10 # Minimum number of panels we allow for a block of flats + # Max area of a roof space we allow panels for + PERCENTAGE_OF_ROOF_LIMIT = 0.8 + def __init__(self, api_key, max_retries=5): """ Initialize the GoogleSolarApi class with the provided API key and maximum retries. @@ -159,10 +162,11 @@ class GoogleSolarApi: # Automatically exclude north-facing segments self.exclude_north_facing_segments(property_instance=property_instance) # If a property is semi-detached, it's possible for us to include segments from an attached unit - if (property_instance.data["built-form"] == "Semi-Detached") and ( - property_instance.data["extension-count"] == 0 - ): - self.exclude_likely_duplicate_surfaces() + if property_instance is not None: + if (property_instance.data["built-form"] == "Semi-Detached") and ( + property_instance.data["extension-count"] == 0 + ): + self.exclude_likely_duplicate_surfaces() self.roof_area = self.insights_data["solarPotential"]["wholeRoofStats"]['areaMeters2'] self.floor_area = self.insights_data["solarPotential"]["wholeRoofStats"]['groundAreaMeters2'] @@ -179,7 +183,9 @@ class GoogleSolarApi: # We now start finding the solar panel configurations self.optimise_solar_configuration( - energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance + energy_consumption=energy_consumption, + is_building=is_building, + property_instance=property_instance ) # Finally, if we have a double property, we half the data we stored area @@ -295,7 +301,11 @@ class GoogleSolarApi: continue if cost_instance is None: - total_cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000) + total_cost = Costs.solar_pv( + n_panels=roi_summary["n_panels"].sum(), + has_battery=False, + n_floors=3, # Assume the most amount of scaffolding + )["total"] else: total_cost = cost_instance.solar_pv( n_panels=roi_summary["n_panels"].sum(), @@ -491,6 +501,11 @@ class GoogleSolarApi: panel_performance = panel_performance.drop(columns=["n_panels_halved"]) panel_performance = panel_performance[panel_performance["n_panels"] >= min_panels] + # Finally, we prevent pannelled roof area being above a limit + panel_performance = panel_performance[ + panel_performance["panneled_roof_area"] <= self.roof_area * self.PERCENTAGE_OF_ROOF_LIMIT + ] + self.panel_performance = panel_performance def exclude_north_facing_segments(self, property_instance): diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 6ca5d3d0..855fd9d6 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -339,6 +339,9 @@ def extract_property_request_data( # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN # we need to check existence of uprn has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else True + if has_uprn: + has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None] + if has_uprn: property_non_invasive_recommendations = next(( x for x in non_invasive_recommendations if @@ -366,10 +369,21 @@ def extract_property_request_data( property_non_invasive_recommendations["recommendations"] = str(transformed) - property_valution = next(( - float(x["valuation"]) for x in valuation_data if - (str(x["uprn"]) == str(uprn)) - ), None) + # Check if the valuation data has uprn + valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else True + if valuation_has_uprn: + valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None] + + if valuation_has_uprn: + property_valution = next(( + float(x["valuation"]) for x in valuation_data if + (str(x["uprn"]) == str(uprn)) + ), None) + else: + property_valution = next(( + float(x["valuation"]) for x in valuation_data if + (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + ), None) return patch, property_already_installed, property_non_invasive_recommendations, property_valution @@ -444,9 +458,12 @@ async def trigger_plan(body: PlanTriggerRequest): # Create a record in db property_id, is_new = create_property( - session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, - epc_searcher.uprn, - energy_assessment + session=session, + portfolio_id=body.portfolio_id, + address=epc_searcher.address_clean, + postcode=epc_searcher.postcode_clean, + uprn=epc_searcher.uprn, + energy_assessment=energy_assessment ) if not is_new and not body.multi_plan: continue diff --git a/backend/tests/test_search_epc.py b/backend/tests/test_search_epc.py new file mode 100644 index 00000000..3b2e2a5b --- /dev/null +++ b/backend/tests/test_search_epc.py @@ -0,0 +1,50 @@ +import pytest +import os +from backend.SearchEpc import SearchEpc # Replace with your actual module name +from dotenv import load_dotenv + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +class TestSearchEpcIntegration: + @pytest.mark.parametrize( + "address, postcode, uprn, skip_os, expected_partial_address", + [ + # Test case 1: Valid address and postcode, skipping OS + # In this case, the property is an individual flat but the uprn associated to the + # EPC is for the building as a whole, possibly because there was a conversion of sorts + ("Garden Flat, 48 Bedminster Parade", "BS3 4HS", 308249, True, + "260907a5431fa073d193cc6bbec51fbf1ba9a61845ab2503f85aa19ce3ed6afd", 1), + + # Test case 2: Another valid address and postcode + # In this case, the newest EPC, does not have a uprn associated to it. If we did a search by + # uprn, we would get an old EPC + ("Flat 8, Hainton House", "DN32 9AQ", 10090082018, True, + "bd1149a20a73397184f07a9955f872424826e70f4870c058d71be887766ee1f8", 3), + + ], + ) + def test_find_property(self, address, postcode, uprn, skip_os, lmk_key, n_old_epcs): + """ + Integration test for `find_property`, making actual API calls. + """ + # Provide your actual API keys or tokens here + os_api_key = "" + + # Initialize the SearchEpc instance + epc_searcher = SearchEpc( + address1=address, + postcode=postcode, + uprn=uprn, + auth_token=EPC_AUTH_TOKEN, + os_api_key=os_api_key, + ) + + # Execute the method + epc_searcher.find_property(skip_os=skip_os) + + # We check that we have the correct epc + assert epc_searcher.newest_epc["lmk-key"] == lmk_key + assert epc_searcher.newest_epc["uprn"] == uprn + assert len(epc_searcher.older_epcs) == n_old_epcs diff --git a/etl/customers/l_and_g/ic_slides.py b/etl/customers/l_and_g/ic_slides.py index 71b0945c..72dfc2c0 100644 --- a/etl/customers/l_and_g/ic_slides.py +++ b/etl/customers/l_and_g/ic_slides.py @@ -7,16 +7,20 @@ data = pd.read_csv( data["year_built"].value_counts() -# 1991-2002 139 -# 2003-2006 50 -# 1996-2002 42 -# 1976-1982 37 -# 1967-1975 37 -# 1983-1990 33 # 1950-1966 26 +# 1967-1975 37 +# 1976-1982 37 +# 1983-1990 33 +# 1991-1995 139 +# 1996-2002 42 +# 2003-2006 50 data["full_property_type"] = data["property_type"] + ": " + data["built_form"] +houses = data[data["property_type"].isin(["House", "Bungalow"])] +houses["built_form"].value_counts() + +data["property_type"].value_counts() data["full_property_type"].value_counts() # House: Mid-Terrace 136 # House: End-Terrace 83 diff --git a/etl/customers/remote_assessments/app.py b/etl/customers/remote_assessments/app.py index ccbc9ac8..13cdc41b 100644 --- a/etl/customers/remote_assessments/app.py +++ b/etl/customers/remote_assessments/app.py @@ -21,17 +21,20 @@ def app(): { "address": "Garden Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "building_id": 1 + "building_id": 1, + "uprn": 308249, }, { - "addresss": "Top Floor Flat, 48 Bedminster Parade", + "address": "Top Floor Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "building_id": 1 + "building_id": 1, + "uprn": 308251 }, { "address": "First Floor Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "building_id": 1 + "building_id": 1, + "uprn": 308250, } ] asset_list = pd.DataFrame(asset_list) @@ -64,17 +67,17 @@ def app(): { "address": "Garden Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "value": 337_000 + "valuation": 337_000 }, { "addresss": "Top Floor Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "value": 337_000 + "valuation": 337_000 }, { "address": "First Floor Flat, 48 Bedminster Parade", "postcode": "BS3 4HS", - "value": 337_000 + "valuation": 337_000 } ] # Store valuation data to s3 diff --git a/etl/find_my_epc/AssetListEpcData.py b/etl/find_my_epc/AssetListEpcData.py index 7bd16090..bce8cd1f 100644 --- a/etl/find_my_epc/AssetListEpcData.py +++ b/etl/find_my_epc/AssetListEpcData.py @@ -90,4 +90,5 @@ class AssetListEpcData: } ) + self.extracted_data = extracted_data logger.info("Data Extrction complete") diff --git a/recommendations/Costs.py b/recommendations/Costs.py index ee4db7eb..2312dff2 100644 --- a/recommendations/Costs.py +++ b/recommendations/Costs.py @@ -719,8 +719,9 @@ class Costs: "labour_days": labour_days } + @classmethod def solar_pv( - self, + cls, n_panels: int | float, has_battery: bool = False, array_cost=None, @@ -774,7 +775,7 @@ class Costs: # We add an additional cost for scaffolding # The costs from installers exclude VAT - vat = subtotal * self.VAT_RATE + vat = subtotal * cls.VAT_RATE total_cost = subtotal + vat # Labour hours are based on estimates from online research but an average team seems to consist of 3 people diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py index 66c1d0c3..ed5554dc 100644 --- a/recommendations/SolarPvRecommendations.py +++ b/recommendations/SolarPvRecommendations.py @@ -106,10 +106,16 @@ class SolarPvRecommendations: roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / total_roof_area * 100) else: raise Exception("IMPLEMENT ME") + + n_floors = ( + self.property.number_of_storeys["number_of_storeys"] if + self.property.number_of_storeys["number_of_storeys"] is not None else 3 + ) + total_cost = self.costs.solar_pv( array_cost=recommendation_config.get("cost", None), n_panels=recommendation_config["n_panels"], - n_floors=self.property.number_of_storeys["number_of_storeys"], + n_floors=n_floors, needs_inverter=True, )["total"] / n_units diff --git a/recommendations/county_to_region.py b/recommendations/county_to_region.py index f7d5193f..e84b5698 100644 --- a/recommendations/county_to_region.py +++ b/recommendations/county_to_region.py @@ -111,8 +111,11 @@ county_to_region_map = { 'Windsor and Maidenhead': 'South East England', 'Woking': 'South East England', 'Wokingham': 'South East England', 'Worthing': 'South East England', 'Wycombe': 'South East England', 'Bath and North East Somerset': 'South West England', 'Bournemouth': 'South West England', - 'Bristol': 'South West England', 'Cheltenham': 'South West England', 'Christchurch': 'South West England', - 'City of Bristol': 'South West England', 'Cornwall': 'South West England', 'Cotswold': 'South West England', + 'Bristol': 'South West England', + 'Cheltenham': 'South West England', 'Christchurch': 'South West England', + 'City of Bristol': 'South West England', + 'Bristol, City of': 'South West England', + 'Cornwall': 'South West England', 'Cotswold': 'South West England', 'Devon': 'South West England', 'Dorset': 'South West England', 'East Devon': 'South West England', 'East Dorset': 'South West England', 'Exeter': 'South West England', 'Forest of Dean': 'South West England', 'Gloucester': 'South West England', 'Gloucestershire': 'South West England',