added unit tests

This commit is contained in:
Khalim Conn-Kowlessar 2025-01-23 20:57:27 +00:00
parent 020ac42c5f
commit 0fad758fbb
10 changed files with 190 additions and 55 deletions

View file

@ -139,8 +139,8 @@ class SearchEpc:
}
NODATA = {
"status": 201,
"message": "No data",
"status": 204,
"message": "no data",
"error": None
}
@ -155,7 +155,7 @@ class SearchEpc:
uprn: [int, None] = None,
size=None,
property_type=None,
fast=False
fast=False,
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -248,14 +248,10 @@ class SearchEpc:
else:
return None
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params is None:
if self.uprn:
params = {"uprn": self.uprn}
else:
params = {"address": self.address1, "postcode": self.postcode}
def _get_epc(self, params, size):
"""
To be called by get_epc() - not for external usage
"""
url = os.path.join(self.client.domestic.host, "search")
if size:
@ -268,24 +264,20 @@ class SearchEpc:
if response:
self.data = response
return self.SUCCESS
return {
"response": response,
"msg": self.SUCCESS
}
if retry > 0:
logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
return {
"status": 204,
"message": "no data",
"error": None
"response": response,
"msg": self.NODATA
}
return {
"status": 200,
"message": "success",
"error": None
}
except Exception as e:
if retry < self.max_retries - 1:
# If not the last retry, wait for 3 seconds before retrying
@ -293,11 +285,54 @@ class SearchEpc:
else:
# If it's the last retry, we continue
return {
"status": 500,
"message": "Could not retrieve EPC data",
"error": str(e)
"response": {},
"msg": {
"status": 500,
"message": "Could not retrieve EPC data",
"error": str(e)
}
}
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params:
output = self._get_epc(params=params, size=size)
if output["msg"]["status"] == 200:
self.data = output["response"]
return output["msg"]
uprn_params = {"uprn": self.uprn} if self.uprn else {}
address_params = {"address": self.address1, "postcode": self.postcode}
# We attempt the search with uprn params
data = {"rows": []}
if uprn_params:
api_response = self._get_epc(params=uprn_params, size=size)
if api_response["msg"]["status"] == 200:
data["rows"].extend(api_response["response"]["rows"])
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
# properties are sometimes listed under the wrong UPRN
api_response = self._get_epc(params=address_params, size=size)
if api_response["msg"]["status"] == 200:
# We update the data with the correct uprn
if self.uprn:
for x in api_response["response"]["rows"]:
x["uprn"] = self.uprn
data["rows"].extend(api_response["response"]["rows"])
# We no de-dupe on lmk-key to avoid duplicates
seen = set()
data["rows"] = [
row for row in data["rows"]
if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
]
return api_response["msg"]
def filter_rows(self, rows, property_type=None, address=None):
"""
This method should not be used when property_type and address are both not None

View file

@ -51,6 +51,9 @@ class GoogleSolarApi:
MIN_UNIT_PANELS = 4 # Minimum number of panels we allow for a domestic building
MIN_BUILDING_PANELS = 10 # Minimum number of panels we allow for a block of flats
# Max area of a roof space we allow panels for
PERCENTAGE_OF_ROOF_LIMIT = 0.8
def __init__(self, api_key, max_retries=5):
"""
Initialize the GoogleSolarApi class with the provided API key and maximum retries.
@ -159,10 +162,11 @@ class GoogleSolarApi:
# Automatically exclude north-facing segments
self.exclude_north_facing_segments(property_instance=property_instance)
# If a property is semi-detached, it's possible for us to include segments from an attached unit
if (property_instance.data["built-form"] == "Semi-Detached") and (
property_instance.data["extension-count"] == 0
):
self.exclude_likely_duplicate_surfaces()
if property_instance is not None:
if (property_instance.data["built-form"] == "Semi-Detached") and (
property_instance.data["extension-count"] == 0
):
self.exclude_likely_duplicate_surfaces()
self.roof_area = self.insights_data["solarPotential"]["wholeRoofStats"]['areaMeters2']
self.floor_area = self.insights_data["solarPotential"]["wholeRoofStats"]['groundAreaMeters2']
@ -179,7 +183,9 @@ class GoogleSolarApi:
# We now start finding the solar panel configurations
self.optimise_solar_configuration(
energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance
energy_consumption=energy_consumption,
is_building=is_building,
property_instance=property_instance
)
# Finally, if we have a double property, we half the data we stored area
@ -295,7 +301,11 @@ class GoogleSolarApi:
continue
if cost_instance is None:
total_cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
total_cost = Costs.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
has_battery=False,
n_floors=3, # Assume the most amount of scaffolding
)["total"]
else:
total_cost = cost_instance.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
@ -491,6 +501,11 @@ class GoogleSolarApi:
panel_performance = panel_performance.drop(columns=["n_panels_halved"])
panel_performance = panel_performance[panel_performance["n_panels"] >= min_panels]
# Finally, we prevent pannelled roof area being above a limit
panel_performance = panel_performance[
panel_performance["panneled_roof_area"] <= self.roof_area * self.PERCENTAGE_OF_ROOF_LIMIT
]
self.panel_performance = panel_performance
def exclude_north_facing_segments(self, property_instance):

View file

@ -339,6 +339,9 @@ def extract_property_request_data(
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else True
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
@ -366,10 +369,21 @@ def extract_property_request_data(
property_non_invasive_recommendations["recommendations"] = str(transformed)
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else True
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
else:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), None)
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
@ -444,9 +458,12 @@ async def trigger_plan(body: PlanTriggerRequest):
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean,
epc_searcher.uprn,
energy_assessment
session=session,
portfolio_id=body.portfolio_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
uprn=epc_searcher.uprn,
energy_assessment=energy_assessment
)
if not is_new and not body.multi_plan:
continue

View file

@ -0,0 +1,50 @@
import pytest
import os
from backend.SearchEpc import SearchEpc # Replace with your actual module name
from dotenv import load_dotenv
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
class TestSearchEpcIntegration:
@pytest.mark.parametrize(
"address, postcode, uprn, skip_os, expected_partial_address",
[
# Test case 1: Valid address and postcode, skipping OS
# In this case, the property is an individual flat but the uprn associated to the
# EPC is for the building as a whole, possibly because there was a conversion of sorts
("Garden Flat, 48 Bedminster Parade", "BS3 4HS", 308249, True,
"260907a5431fa073d193cc6bbec51fbf1ba9a61845ab2503f85aa19ce3ed6afd", 1),
# Test case 2: Another valid address and postcode
# In this case, the newest EPC, does not have a uprn associated to it. If we did a search by
# uprn, we would get an old EPC
("Flat 8, Hainton House", "DN32 9AQ", 10090082018, True,
"bd1149a20a73397184f07a9955f872424826e70f4870c058d71be887766ee1f8", 3),
],
)
def test_find_property(self, address, postcode, uprn, skip_os, lmk_key, n_old_epcs):
"""
Integration test for `find_property`, making actual API calls.
"""
# Provide your actual API keys or tokens here
os_api_key = ""
# Initialize the SearchEpc instance
epc_searcher = SearchEpc(
address1=address,
postcode=postcode,
uprn=uprn,
auth_token=EPC_AUTH_TOKEN,
os_api_key=os_api_key,
)
# Execute the method
epc_searcher.find_property(skip_os=skip_os)
# We check that we have the correct epc
assert epc_searcher.newest_epc["lmk-key"] == lmk_key
assert epc_searcher.newest_epc["uprn"] == uprn
assert len(epc_searcher.older_epcs) == n_old_epcs

View file

@ -7,16 +7,20 @@ data = pd.read_csv(
data["year_built"].value_counts()
# 1991-2002 139
# 2003-2006 50
# 1996-2002 42
# 1976-1982 37
# 1967-1975 37
# 1983-1990 33
# 1950-1966 26
# 1967-1975 37
# 1976-1982 37
# 1983-1990 33
# 1991-1995 139
# 1996-2002 42
# 2003-2006 50
data["full_property_type"] = data["property_type"] + ": " + data["built_form"]
houses = data[data["property_type"].isin(["House", "Bungalow"])]
houses["built_form"].value_counts()
data["property_type"].value_counts()
data["full_property_type"].value_counts()
# House: Mid-Terrace 136
# House: End-Terrace 83

View file

@ -21,17 +21,20 @@ def app():
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
"building_id": 1,
"uprn": 308249,
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"address": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
"building_id": 1,
"uprn": 308251
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
"building_id": 1,
"uprn": 308250,
}
]
asset_list = pd.DataFrame(asset_list)
@ -64,17 +67,17 @@ def app():
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
"valuation": 337_000
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
"valuation": 337_000
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
"valuation": 337_000
}
]
# Store valuation data to s3

View file

@ -90,4 +90,5 @@ class AssetListEpcData:
}
)
self.extracted_data = extracted_data
logger.info("Data Extrction complete")

View file

@ -719,8 +719,9 @@ class Costs:
"labour_days": labour_days
}
@classmethod
def solar_pv(
self,
cls,
n_panels: int | float,
has_battery: bool = False,
array_cost=None,
@ -774,7 +775,7 @@ class Costs:
# We add an additional cost for scaffolding
# The costs from installers exclude VAT
vat = subtotal * self.VAT_RATE
vat = subtotal * cls.VAT_RATE
total_cost = subtotal + vat
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people

View file

@ -106,10 +106,16 @@ class SolarPvRecommendations:
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / total_roof_area * 100)
else:
raise Exception("IMPLEMENT ME")
n_floors = (
self.property.number_of_storeys["number_of_storeys"] if
self.property.number_of_storeys["number_of_storeys"] is not None else 3
)
total_cost = self.costs.solar_pv(
array_cost=recommendation_config.get("cost", None),
n_panels=recommendation_config["n_panels"],
n_floors=self.property.number_of_storeys["number_of_storeys"],
n_floors=n_floors,
needs_inverter=True,
)["total"] / n_units

View file

@ -111,8 +111,11 @@ county_to_region_map = {
'Windsor and Maidenhead': 'South East England', 'Woking': 'South East England', 'Wokingham': 'South East England',
'Worthing': 'South East England', 'Wycombe': 'South East England',
'Bath and North East Somerset': 'South West England', 'Bournemouth': 'South West England',
'Bristol': 'South West England', 'Cheltenham': 'South West England', 'Christchurch': 'South West England',
'City of Bristol': 'South West England', 'Cornwall': 'South West England', 'Cotswold': 'South West England',
'Bristol': 'South West England',
'Cheltenham': 'South West England', 'Christchurch': 'South West England',
'City of Bristol': 'South West England',
'Bristol, City of': 'South West England',
'Cornwall': 'South West England', 'Cotswold': 'South West England',
'Devon': 'South West England', 'Dorset': 'South West England', 'East Devon': 'South West England',
'East Dorset': 'South West England', 'Exeter': 'South West England', 'Forest of Dean': 'South West England',
'Gloucester': 'South West England', 'Gloucestershire': 'South West England',