diff --git a/.idea/Model.iml b/.idea/Model.iml
index df6c4faa..762580d9 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 50cad4ca..c916a158 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/backend/Funding.py b/backend/Funding.py
index 8a9b08ae..f0780c51 100644
--- a/backend/Funding.py
+++ b/backend/Funding.py
@@ -12,6 +12,8 @@ class Funding:
and flag any tenant specific requirements that need to be considered to the funding to be attained
"""
+ SCHEMES = ["eco4", "gbis", "whlg"]
+
ECO_SAP_SCORE_THREHOLDS = [
{'Band': 'High_A', 'From': 96.0, 'Up to': 100.0, 'Mid-point': 98.0},
{'Band': 'Low_A', 'From': 92.0, 'Up to': 96.0, 'Mid-point': 94.0},
@@ -34,10 +36,12 @@ class Funding:
tenure: HousingType,
starting_epc,
starting_sap,
+ postcode,
floor_area,
council_tax_band,
property_recommendations,
project_scores_matrix,
+ whlg_eligible_postcodes,
gbis_abs_rate: int,
eco4_abs_rate: int,
):
@@ -47,6 +51,10 @@ class Funding:
:param starting_epc: The current EPC rating of the property
:param starting_sap: The current SAP score for the property
:param floor_area: The total floor area of the property
+ :param council_tax_band: The council tax band of the property
+ :param property_recommendations: The recommendations for the property
+ :param project_scores_matrix: The matrix of project scores for ECO4
+ :param whlg_eligible_postcodes: The postcodes eligible for WHLG
:param gbis_abs_rate: The assumed £/abs achieved by the installer for GBIS
:param eco4_abs_rate: The assumed £/abs achieved by the installer for ECO4
"""
@@ -58,6 +66,7 @@ class Funding:
self.tenure = tenure
self.starting_epc = starting_epc
self.starting_sap = starting_sap
+ self.postcode = postcode
self.starting_eco_band = self.sap_to_eco_band(self.starting_sap)
self.floor_area_segment = self.classify_floor_area(floor_area)
self.gbis_abs_rate = gbis_abs_rate
@@ -75,6 +84,11 @@ class Funding:
(project_scores_matrix["Starting Band"] == self.starting_eco_band)
]
+ # The postcode column is already lower case
+ self.whlg_eligible_postcodes = whlg_eligible_postcodes[
+ whlg_eligible_postcodes["Postcode"] == self.postcode.lower()
+ ]
+
# Store the final outputs
self.gbis_eligibiltiy = {}
self.eco4_eligibility = {}
@@ -82,6 +96,8 @@ class Funding:
def output(
self,
+ scheme: str,
+ eligible: bool,
measure_types: List[str],
estimated_funding: float,
notify_tenant_benefits_requirements: bool,
@@ -90,12 +106,18 @@ class Funding:
):
""""
"""
+
+ if scheme not in self.SCHEMES:
+ raise ValueError("Scheme not recognised")
+
return {
+ "scheme": scheme,
+ "eligible": eligible,
"measure_types": measure_types,
"estimated_funding": estimated_funding,
- "notify_tenant_benefits_requirements": notify_tenant_benefits_requirements,
- "notify_council_tax_band_requirements": notify_council_tax_band_requirements,
- "notify_tenant_low_income_requirements": notify_tenant_low_income_requirements
+ "requires_benefits": notify_tenant_benefits_requirements,
+ "requires_council_tax_band": notify_council_tax_band_requirements,
+ "requires_low_income": notify_tenant_low_income_requirements
}
@staticmethod
@@ -234,6 +256,8 @@ class Funding:
# If the council tax band is missing, we nofify the customer that this is a requirement that
# should be checked
return self.output(
+ scheme="gbis",
+ eligible=True,
measure_types=[recommended_measure["measure_type"]],
estimated_funding=recommended_measure["estimated_funding"],
notify_tenant_benefits_requirements=False,
@@ -251,6 +275,8 @@ class Funding:
# We find the best measure for GBIS
recommended_measure = self.find_best_gbis_measure(measures=valid_measures)
return self.output(
+ scheme="gbis",
+ eligible=True,
measure_types=[recommended_measure["measure_type"]],
estimated_funding=recommended_measure["estimated_funding"],
notify_tenant_benefits_requirements=True,
@@ -260,6 +286,8 @@ class Funding:
# Otherwise, no funding availability
return self.output(
+ scheme="gbis",
+ eligible=False,
measure_types=[],
estimated_funding=0,
notify_tenant_benefits_requirements=False,
@@ -279,6 +307,23 @@ class Funding:
raise NotImplementedError("Implement social/oo")
+ def whlg(self):
+ if self.tenure == "Social":
+ # We can't do anything for social housing
+ self.whlg_eligibility = self.output(
+ scheme="whlg",
+ eligible=False,
+ measure_types=[],
+ estimated_funding=0,
+ notify_tenant_benefits_requirements=False,
+ notify_council_tax_band_requirements=False,
+ notify_tenant_low_income_requirements=False
+ )
+ return
+
+ if not self.whlg_eligible_postcodes.empty:
+ print("Eligible implement me!")
+
def eco4(self):
if self.tenure == "Private":
self.eco4_eligibiltiy = self.eco4_prs()
@@ -292,4 +337,4 @@ class Funding:
self.gbis()
# self.eco4()
- # self.whlg()
+ self.whlg()
diff --git a/backend/Property.py b/backend/Property.py
index 0b63b266..a495431f 100644
--- a/backend/Property.py
+++ b/backend/Property.py
@@ -133,9 +133,14 @@ class Property:
self.energy_cost_estimates = {}
self.energy_consumption_estimates = {}
+ # when storing the energy, we'll also
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
- "co2_emissions": epc_record.get("co2_emissions_current"),
+ "epc_co2_emissions": epc_record.get("co2_emissions_current"),
+ # These will be added in once we estimate the amount of emissions from appliances - using the carbon
+ # intensity of electricity
+ "appliances_co2_emissions": None,
+ "co2_emissions": None
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
@@ -725,6 +730,15 @@ class Property:
"unadjusted": unadjusted_kwh_estimates
}
+ # Update carbon with appliances
+ self.energy["appliances_co2_emissions"] = (
+ (unadjusted_kwh_estimates["appliances"] * assumptions.ELECTRICITY_CARBON_INTENSITY) / 1000
+ )
+ # Re-calculate total CO2 emissions
+ self.energy["co2_emissions"] = float(np.round(
+ self.energy["epc_co2_emissions"] + self.energy["appliances_co2_emissions"], 2
+ ))
+
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py
index 8ec4fdbe..c74a0b1f 100644
--- a/backend/SearchEpc.py
+++ b/backend/SearchEpc.py
@@ -139,8 +139,8 @@ class SearchEpc:
}
NODATA = {
- "status": 201,
- "message": "No data",
+ "status": 204,
+ "message": "no data",
"error": None
}
@@ -155,7 +155,7 @@ class SearchEpc:
uprn: [int, None] = None,
size=None,
property_type=None,
- fast=False
+ fast=False,
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@@ -248,14 +248,10 @@ class SearchEpc:
else:
return None
- def get_epc(self, params=None, size=None):
- # Get the EPC data with retries
- size = size if size is not None else self.size
- if params is None:
- if self.uprn:
- params = {"uprn": self.uprn}
- else:
- params = {"address": self.address1, "postcode": self.postcode}
+ def _get_epc(self, params, size):
+ """
+ To be called by get_epc() - not for external usage
+ """
url = os.path.join(self.client.domestic.host, "search")
if size:
@@ -268,24 +264,20 @@ class SearchEpc:
if response:
self.data = response
- return self.SUCCESS
+ return {
+ "response": response,
+ "msg": self.SUCCESS
+ }
if retry > 0:
logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
return {
- "status": 204,
- "message": "no data",
- "error": None
+ "response": response,
+ "msg": self.NODATA
}
- return {
- "status": 200,
- "message": "success",
- "error": None
- }
-
except Exception as e:
if retry < self.max_retries - 1:
# If not the last retry, wait for 3 seconds before retrying
@@ -293,11 +285,54 @@ class SearchEpc:
else:
# If it's the last retry, we continue
return {
- "status": 500,
- "message": "Could not retrieve EPC data",
- "error": str(e)
+ "response": {},
+ "msg": {
+ "status": 500,
+ "message": "Could not retrieve EPC data",
+ "error": str(e)
+ }
}
+ def get_epc(self, params=None, size=None):
+ # Get the EPC data with retries
+ size = size if size is not None else self.size
+ if params:
+ output = self._get_epc(params=params, size=size)
+ if output["msg"]["status"] == 200:
+ self.data = output["response"]
+ return output["msg"]
+
+ uprn_params = {"uprn": self.uprn} if self.uprn else {}
+ address_params = {"address": self.address1, "postcode": self.postcode}
+
+ # We attempt the search with uprn params
+
+ data = {"rows": []}
+ if uprn_params:
+ api_response = self._get_epc(params=uprn_params, size=size)
+ if api_response["msg"]["status"] == 200:
+ data["rows"].extend(api_response["response"]["rows"])
+
+ # If we were unsuccessful, we then make a second attempt to fetch the data. We find that
+ # properties are sometimes listed under the wrong UPRN
+ api_response = self._get_epc(params=address_params, size=size)
+ if api_response["msg"]["status"] == 200:
+ # We update the data with the correct uprn
+ if self.uprn:
+ for x in api_response["response"]["rows"]:
+ x["uprn"] = self.uprn
+
+ data["rows"].extend(api_response["response"]["rows"])
+
+ # We no de-dupe on lmk-key to avoid duplicates
+ seen = set()
+ data["rows"] = [
+ row for row in data["rows"]
+ if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
+ ]
+
+ return api_response["msg"]
+
def filter_rows(self, rows, property_type=None, address=None):
"""
This method should not be used when property_type and address are both not None
@@ -693,9 +728,20 @@ class SearchEpc:
estimated_epc[variable] = str(int(estimated_epc[variable]))
# This is a string
- estimated_epc["low-energy-fixed-light-count"] = str(estimated_epc["low-energy-fixed-light-count"])
+ estimated_epc["low-energy-fixed-light-count"] = (
+ str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else ""
+ )
+ # This is an int
+ estimated_epc["photo-supply"] = (
+ int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[
+ "photo-supply"]
+ )
estimated_epc["postcode"] = self.postcode
+ if not self.uprn:
+ # Update self.uprn too
+ self.uprn = hash(self.address1 + self.postcode)
+
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address
# Indicate that this epc was estimated
diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py
index e2b7d933..183503d5 100644
--- a/backend/apis/GoogleSolarApi.py
+++ b/backend/apis/GoogleSolarApi.py
@@ -51,6 +51,9 @@ class GoogleSolarApi:
MIN_UNIT_PANELS = 4 # Minimum number of panels we allow for a domestic building
MIN_BUILDING_PANELS = 10 # Minimum number of panels we allow for a block of flats
+ # Max area of a roof space we allow panels for
+ PERCENTAGE_OF_ROOF_LIMIT = 0.8
+
def __init__(self, api_key, max_retries=5):
"""
Initialize the GoogleSolarApi class with the provided API key and maximum retries.
@@ -159,10 +162,11 @@ class GoogleSolarApi:
# Automatically exclude north-facing segments
self.exclude_north_facing_segments(property_instance=property_instance)
# If a property is semi-detached, it's possible for us to include segments from an attached unit
- if (property_instance.data["built-form"] == "Semi-Detached") and (
- property_instance.data["extension-count"] == 0
- ):
- self.exclude_likely_duplicate_surfaces()
+ if property_instance is not None:
+ if (property_instance.data["built-form"] == "Semi-Detached") and (
+ property_instance.data["extension-count"] == 0
+ ):
+ self.exclude_likely_duplicate_surfaces()
self.roof_area = self.insights_data["solarPotential"]["wholeRoofStats"]['areaMeters2']
self.floor_area = self.insights_data["solarPotential"]["wholeRoofStats"]['groundAreaMeters2']
@@ -179,7 +183,9 @@ class GoogleSolarApi:
# We now start finding the solar panel configurations
self.optimise_solar_configuration(
- energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance
+ energy_consumption=energy_consumption,
+ is_building=is_building,
+ property_instance=property_instance
)
# Finally, if we have a double property, we half the data we stored area
@@ -295,7 +301,11 @@ class GoogleSolarApi:
continue
if cost_instance is None:
- total_cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
+ total_cost = Costs.solar_pv(
+ n_panels=roi_summary["n_panels"].sum(),
+ has_battery=False,
+ n_floors=3, # Assume the most amount of scaffolding
+ )["total"]
else:
total_cost = cost_instance.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
@@ -491,6 +501,11 @@ class GoogleSolarApi:
panel_performance = panel_performance.drop(columns=["n_panels_halved"])
panel_performance = panel_performance[panel_performance["n_panels"] >= min_panels]
+ # Finally, we prevent pannelled roof area being above a limit
+ panel_performance = panel_performance[
+ panel_performance["panneled_roof_area"] <= self.roof_area * self.PERCENTAGE_OF_ROOF_LIMIT
+ ]
+
self.panel_performance = panel_performance
def exclude_north_facing_segments(self, property_instance):
diff --git a/backend/app/assumptions.py b/backend/app/assumptions.py
index 44838a47..841ec2c1 100644
--- a/backend/app/assumptions.py
+++ b/backend/app/assumptions.py
@@ -1,7 +1,7 @@
-# Assumes that the average efficiency of an air source heat pump is 250%, taking the median of the 200-400% range,
-# which is often quoted as a sensible efficiency range for air source heat pumps.
+# We assume that the ASHP efficiency is 280%, which is the minimum that Cotswolds Energy Group achieves, as
+# they target this
PESSIMISTIC_ASHP_EFFICIENCY = 200
-AVERAGE_ASHP_EFFICIENCY = 250
+AVERAGE_ASHP_EFFICIENCY = 280
# Conservative estimate of the proportion of electricity that will be consumed, whereas the rest will
# be exported. These are averages based on Google research. E.g
@@ -14,6 +14,9 @@ RDSAP_AREA_PER_PANEL = 3.4
SOCIAL_TENURES = ["Rented (social)", "rental (social)"]
+# Carbon intensity of electricity, as of 16th Jan 2025
+ELECTRICITY_CARBON_INTENSITY = 0.232
+
DESCRIPTIONS_TO_FUEL_TYPES = {
"Air source heat pump, radiators, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100
diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py
index fb896659..04a2ef7f 100644
--- a/backend/app/plan/router.py
+++ b/backend/app/plan/router.py
@@ -121,7 +121,7 @@ def extract_portfolio_aggregation_data(
# We can now calculate multiple outputs based on default recommendations
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
- pre_retrofit_co2 = p.data["co2-emissions-current"]
+ pre_retrofit_co2 = p.energy["co2_emissions"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
@@ -339,6 +339,9 @@ def extract_property_request_data(
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else True
+ if has_uprn:
+ has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
+
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
@@ -366,23 +369,45 @@ def extract_property_request_data(
property_non_invasive_recommendations["recommendations"] = str(transformed)
- property_valution = next((
- float(x["valuation"]) for x in valuation_data if
- (str(x["uprn"]) == str(uprn))
- ), None)
+ # Check if the valuation data has uprn
+ valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else True
+ if valuation_has_uprn:
+ valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
+
+ if valuation_has_uprn:
+ property_valution = next((
+ float(x["valuation"]) for x in valuation_data if
+ (str(x["uprn"]) == str(uprn))
+ ), None)
+ else:
+ property_valution = next((
+ float(x["valuation"]) for x in valuation_data if
+ (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
+ ), None)
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
-def get_eco_project_scores_matrix():
- data = read_csv_from_s3(
+def get_funding_data():
+ """
+ This function retrieves the eco project scores matrix and the warm homes local grant funding data
+ :return:
+ """
+ project_scores_matrix = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/ECO4 Full Project Scores Matrix.csv",
)
- df = pd.DataFrame(data)
- df.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
- df["Cost Savings"] = df["Cost Savings"].astype(float)
- return df
+ project_scores_matrix = pd.DataFrame(project_scores_matrix)
+ project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
+ project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
+
+ whlg_eligible_postcodes = read_csv_from_s3(
+ bucket_name=get_settings().DATA_BUCKET,
+ filepath="funding/whlg eligible postcodes.csv",
+ )
+ whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
+
+ return project_scores_matrix, whlg_eligible_postcodes
router = APIRouter(
@@ -407,6 +432,7 @@ async def trigger_plan(body: PlanTriggerRequest):
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
+
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
@@ -443,9 +469,12 @@ async def trigger_plan(body: PlanTriggerRequest):
# Create a record in db
property_id, is_new = create_property(
- session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean,
- epc_searcher.uprn,
- energy_assessment
+ session=session,
+ portfolio_id=body.portfolio_id,
+ address=epc_searcher.address_clean,
+ postcode=epc_searcher.postcode_clean,
+ uprn=epc_searcher.uprn,
+ energy_assessment=energy_assessment
)
if not is_new and not body.multi_plan:
continue
@@ -526,7 +555,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
- eco_project_scores_matrix = get_eco_project_scores_matrix()
+ eco_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
@@ -670,9 +699,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Insert the predictions into the recommendations and run the optimiser
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
- # possibility with heating system
- # TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water
- # cylinder jacket), we should add these to the recommendations as default
+ # possibility with heating system?
for p in input_properties:
if not recommendations.get(p.id):
@@ -680,37 +707,42 @@ async def trigger_plan(body: PlanTriggerRequest):
input_measures = prepare_input_measures(recommendations[p.id], body.goal)
- current_sap_points = int(p.data["current-energy-efficiency"])
- target_sap_points = epc_to_sap_lower_bound(body.goal_value)
- sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
-
- if not body.optimise:
- if body.goal != "Increasing EPC":
- raise NotImplementedError("Only EPC optimisation is currently supported")
- solution = []
- for sub_list in input_measures:
- # Select the entry with the highest gain, and if tied, choose the one with the lowest cost
- best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost']))
- solution.append(best_measure)
+ if not input_measures[0]:
+ # This means that we have no defaults
+ selected_recommendations = {}
else:
- if body.budget:
- optimiser = GainOptimiser(
- input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
- )
+ current_sap_points = int(p.data["current-energy-efficiency"])
+ target_sap_points = epc_to_sap_lower_bound(body.goal_value)
+ sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
+
+ if not body.optimise:
+ if body.goal != "Increasing EPC":
+ raise NotImplementedError("Only EPC optimisation is currently supported")
+ solution = []
+ for sub_list in input_measures:
+ # Select the entry with the highest gain, and if tied, choose the one with the lowest cost
+ best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost']))
+ solution.append(best_measure)
else:
- # The minimum gain is the minimum number of SAP points required to get to the target SAP band
- # If the gain is negative, the optimiser will return an empty solution
- optimiser = CostOptimiser(
- input_measures,
- min_gain=sap_gain
- )
- optimiser.setup()
- optimiser.solve()
- solution = optimiser.solution
+ if body.budget:
+ optimiser = GainOptimiser(
+ input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
+ )
+ else:
+ # The minimum gain is the minimum number of SAP points required to get to the target SAP band
+ # If the gain is negative, the optimiser will return an empty solution
+ optimiser = CostOptimiser(
+ input_measures,
+ min_gain=sap_gain
+ )
- selected_recommendations = {r["id"] for r in solution}
+ optimiser.setup()
+ optimiser.solve()
+ solution = optimiser.solution
+
+ selected_recommendations = {r["id"] for r in solution}
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
if any(x in [r["type"] for r in solution] for x in [
@@ -749,25 +781,53 @@ async def trigger_plan(body: PlanTriggerRequest):
]
recommendations[p.id] = final_recommendations
+ # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
+ # of them
+ # TODO: We can probably do better and optimise at the building level - this is temp
+ logger.info("Adjusting solar PV recommendations for buildings")
+ building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
+
+ for bid in building_ids:
+ # We check if any of them have solar PV
+ building = [p for p in input_properties if p.building_id == bid]
+ has_solar = False
+ for unit in building:
+ # Get default recommendations
+ has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
+ if has_solar:
+ break
+
+ if has_solar:
+ # We adjust the units within the building
+ for unit in building:
+ for rec in recommendations[unit.id]:
+ if rec["type"] == "solar_pv":
+ # This is straightforward, we just set the default to True, since when we're at a building
+ # level, we only allow 1 solar PV option for each unit. If we change this, this logic will
+ # need to be updated
+ rec["default"] = True
+
# ~~~~~~~~~~~~~~~~
# Funding
# ~~~~~~~~~~~~~~~~
- # for p in input_properties:
- # funding_calulator = Funding(
- # tenure=body.housing_type,
- # starting_epc=p.data["current-energy-rating"],
- # starting_sap=int(p.data["current-energy-efficiency"]),
- # floor_area=p.floor_area,
- # council_tax_band=None, # This is seemingly always None at the moment
- # property_recommendations=recommendations[p.id],
- # project_scores_matrix=eco_project_scores_matrix,
- # gbis_abs_rate=20,
- # eco4_abs_rate=20,
- # )
- # funding_calulator.check_eligibiltiy()
- # # Insert finding
- # p.insert_funding(funding_calulator)
+ for p in input_properties:
+ funding_calulator = Funding(
+ tenure=body.housing_type,
+ starting_epc=p.data["current-energy-rating"],
+ starting_sap=int(p.data["current-energy-efficiency"]),
+ postcode=p.postcode,
+ floor_area=p.floor_area,
+ council_tax_band=None, # This is seemingly always None at the moment
+ property_recommendations=recommendations[p.id],
+ project_scores_matrix=eco_project_scores_matrix,
+ whlg_eligible_postcodes=whlg_eligible_postcodes,
+ gbis_abs_rate=20,
+ eco4_abs_rate=15,
+ )
+ funding_calulator.check_eligibiltiy()
+ # Insert finding
+ p.insert_funding(funding_calulator)
logger.info("Uploading recommendations to the database")
# If we have any work to do, we create a new scenario
diff --git a/backend/ml_models/AnnualBillSavings.py b/backend/ml_models/AnnualBillSavings.py
index 211e5ea6..b22837d8 100644
--- a/backend/ml_models/AnnualBillSavings.py
+++ b/backend/ml_models/AnnualBillSavings.py
@@ -28,8 +28,8 @@ class AnnualBillSavings:
# Latest price cap figures from Ofgem are for April 2024
# https://www.ofgem.gov.uk/energy-price-cap
- ELECTRICITY_PRICE_CAP = 0.2236
- GAS_PRICE_CAP = 0.0548
+ ELECTRICITY_PRICE_CAP = 0.2486
+ GAS_PRICE_CAP = 0.0634
# This is the most recent export payment figure, at 9.28p/kWh
# Smart export guarantee rates can be found here:
# https://www.sunsave.energy/solar-panels-advice/exporting-to-the-grid/best-seg-rates
@@ -39,8 +39,8 @@ class AnnualBillSavings:
PRICE_FACTOR = 0.09549999999999999
# Daily standard charge, based on average across England, Scotland and Wales, and includes VAT
- DAILY_STANDARD_CHARGE_GAS = 0.3143
- DAILY_STANDARD_CHARGE_ELECTRICITY = 0.601
+ DAILY_STANDARD_CHARGE_GAS = 0.3165
+ DAILY_STANDARD_CHARGE_ELECTRICITY = 0.6097
# Based on https://www.nottenergy.com/advice-and-tools/project-energy-cost-comparison
# For July 2024. These quotes are based on the east midlands region, so we
diff --git a/backend/tests/test_search_epc.py b/backend/tests/test_search_epc.py
new file mode 100644
index 00000000..3b2e2a5b
--- /dev/null
+++ b/backend/tests/test_search_epc.py
@@ -0,0 +1,50 @@
+import pytest
+import os
+from backend.SearchEpc import SearchEpc # Replace with your actual module name
+from dotenv import load_dotenv
+
+load_dotenv(dotenv_path="backend/.env")
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
+
+
+class TestSearchEpcIntegration:
+ @pytest.mark.parametrize(
+ "address, postcode, uprn, skip_os, expected_partial_address",
+ [
+ # Test case 1: Valid address and postcode, skipping OS
+ # In this case, the property is an individual flat but the uprn associated to the
+ # EPC is for the building as a whole, possibly because there was a conversion of sorts
+ ("Garden Flat, 48 Bedminster Parade", "BS3 4HS", 308249, True,
+ "260907a5431fa073d193cc6bbec51fbf1ba9a61845ab2503f85aa19ce3ed6afd", 1),
+
+ # Test case 2: Another valid address and postcode
+ # In this case, the newest EPC, does not have a uprn associated to it. If we did a search by
+ # uprn, we would get an old EPC
+ ("Flat 8, Hainton House", "DN32 9AQ", 10090082018, True,
+ "bd1149a20a73397184f07a9955f872424826e70f4870c058d71be887766ee1f8", 3),
+
+ ],
+ )
+ def test_find_property(self, address, postcode, uprn, skip_os, lmk_key, n_old_epcs):
+ """
+ Integration test for `find_property`, making actual API calls.
+ """
+ # Provide your actual API keys or tokens here
+ os_api_key = ""
+
+ # Initialize the SearchEpc instance
+ epc_searcher = SearchEpc(
+ address1=address,
+ postcode=postcode,
+ uprn=uprn,
+ auth_token=EPC_AUTH_TOKEN,
+ os_api_key=os_api_key,
+ )
+
+ # Execute the method
+ epc_searcher.find_property(skip_os=skip_os)
+
+ # We check that we have the correct epc
+ assert epc_searcher.newest_epc["lmk-key"] == lmk_key
+ assert epc_searcher.newest_epc["uprn"] == uprn
+ assert len(epc_searcher.older_epcs) == n_old_epcs
diff --git a/etl/access_reporting/app.py b/etl/access_reporting/app.py
index 830f4370..8a8254a1 100644
--- a/etl/access_reporting/app.py
+++ b/etl/access_reporting/app.py
@@ -83,8 +83,11 @@ def api_call_decorator(func):
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
+ n_calls = 0
while url:
+ logger.info("Making call for page: " + str(n_calls + 1))
+ n_calls += 1
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
@@ -93,6 +96,7 @@ def api_call_decorator(func):
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
+ logger.info(f"Next page URL: {url}")
else:
response_data = response_json # Capture the full response for consistency
break
@@ -270,6 +274,48 @@ class SharePointClient:
return file_content
+ def download_sharepoint_folder(self, drive_id, folder_path, download_dir, excluded_file_types=None):
+ """
+ Downloads all files in a SharePoint folder to the specified local directory.
+
+ :param drive_id: The ID of the SharePoint drive.
+ :param folder_path: The path of the folder in SharePoint.
+ :param download_dir: The local directory to save the downloaded files.
+ :param excluded_file_types: A list of file types to exclude from download (default is None).
+ """
+
+ excluded_file_types = [] if excluded_file_types is None else excluded_file_types
+
+ # Ensure the download directory exists
+ os.makedirs(download_dir, exist_ok=True)
+
+ # List folder contents
+ folder_contents = self.list_folder_contents(drive_id, folder_path)
+ files = folder_contents.get('value', [])
+
+ for item in files:
+ if item.get('folder'): # Check if it's a folder
+ # Recursively handle subfolders
+ subfolder_path = f"{folder_path}/{item['name']}"
+ subfolder_dir = os.path.join(download_dir, item['name'])
+ self.download_sharepoint_folder(drive_id, subfolder_path, subfolder_dir)
+ else:
+ # It's a file, download it
+ file_name = item['name']
+ if file_name.split(".")[-1] in excluded_file_types:
+ continue
+ download_url = item['@microsoft.graph.downloadUrl']
+
+ logger.info(f"Downloading file: {file_name}")
+ file_content = self.download_sharepoint_file(download_url)
+
+ # Save the file locally
+ file_path = os.path.join(download_dir, file_name)
+ with open(file_path, 'wb') as f:
+ f.write(file_content.read())
+
+ logger.info(f"File saved to: {file_path}")
+
def app():
# Customers for WC 18/11/2024
diff --git a/etl/customers/cambridge/remote_assessment.py b/etl/customers/cambridge/remote_assessment.py
index 3f152e79..dc5beff5 100644
--- a/etl/customers/cambridge/remote_assessment.py
+++ b/etl/customers/cambridge/remote_assessment.py
@@ -21,10 +21,10 @@ def app():
"property_type": "House", "built-form": "Semi-Detached"
},
{
- "address": "21 High Street", "postcode": "CB23 8AB", "uprn": 100090136026
+ "address": "21 High Street", "postcode": "CB23 8AB", "uprn": 100090144815
},
{
- "address": "22 High Street", "postcode": "CB23 8AB", "uprn": 100090136027
+ "address": "22 High Street", "postcode": "CB23 8AB", "uprn": 100090144816
},
{
"address": "5 Bunkers Hill", "postcode": "CB3 0LY", "uprn": 10008078615
@@ -52,8 +52,8 @@ def app():
valuations_data = [
{'uprn': 100090136018, "valuation": 586_000},
- {'uprn': 100090136026, "valuation": 551_000},
- {'uprn': 100090136027, "valuation": 844_000},
+ {'uprn': 100090144815, "valuation": 446_000},
+ {'uprn': 100090144816, "valuation": 448_000},
{'uprn': 10008078615, "valuation": 763_000},
{'uprn': 10008078616, "valuation": 616_000},
{'uprn': 10008078617, "valuation": 593_000},
diff --git a/etl/customers/l_and_g/ic_asset_list.py b/etl/customers/l_and_g/ic_asset_list.py
new file mode 100644
index 00000000..d0966bdf
--- /dev/null
+++ b/etl/customers/l_and_g/ic_asset_list.py
@@ -0,0 +1,166 @@
+"""
+This script prepares the asset list for modelling the properties from the L&Q dataset, for their January IC
+"""
+
+import pandas as pd
+import numpy as np
+
+from etl.route_march_data_pull.app import get_data
+from utils.s3 import save_csv_to_s3
+
+PORTFOLIO_ID = 124
+USER_ID = 8
+
+
+def app():
+ asset_data = pd.read_excel(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon information for Domna/Basildon MDS v1.4 "
+ "(1).xlsx",
+ sheet_name="Basildon",
+ header=5
+ )
+
+ asset_data = asset_data.head(-3)
+
+ asset_data["address1"] = np.where(
+ pd.isnull(asset_data["Address 1"]),
+ asset_data["Address 2"],
+ asset_data["Address 1"]
+ )
+
+ asset_data["full_address"] = np.where(
+ pd.isnull(asset_data["Address 1"]),
+ asset_data["Address 2"] + ", " + asset_data["Address 3"],
+ asset_data["Address 1"] + ", " + asset_data["Address 2"] + ", " + asset_data["Address 3"],
+ )
+
+ asset_list = asset_data[["address1", "PostCode", "full_address", "Bedrooms"]]
+
+ asset_list = asset_list.reset_index(drop=True)
+
+ asset_list["row_id"] = asset_list.index
+
+ # L&G's focus:
+ # Measures: loft and cavity insulation, replacement thermally efficient windows, PV cells, AS heat pumps.
+
+ epc_data, errors, no_epc = get_data(
+ asset_list=asset_list,
+ fulladdress_column="full_address",
+ address1_column="address1",
+ postcode_column="PostCode",
+ manual_uprn_map={}
+ )
+
+ missed = asset_list[
+ asset_list["row_id"].isin(no_epc)
+ ]
+
+ # We merge on the property types, where we have them
+ missed = missed.merge(
+ asset_data[["address1", "PostCode", "Property Type"]],
+ how="left",
+ on=["address1", "PostCode"]
+ )
+ # Remap Block: Residential to Flat
+ missed["Property Type"] = np.where(
+ missed["Property Type"] == "Block: Residential",
+ "Flat",
+ missed["Property Type"]
+ )
+
+ # We create the asset list - we have some properties that genuninely never had an EPC
+
+ epc_df = pd.DataFrame(epc_data)
+ fetched_asset_list = epc_df[["address1", "postcode", "uprn", "row_id"]]
+ fetched_asset_list = fetched_asset_list.merge(
+ asset_list[["row_id", "Bedrooms"]],
+ how="left",
+ on=["row_id"]
+ )
+
+ missed = missed.rename(columns={"PostCode": "postcode"}).drop(columns=["row_id"])
+
+ # missed.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/missed_epcs.csv")
+ missed_uprns = pd.read_csv(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/missed_epcs_uprn.csv",
+ )
+
+ missed = missed.merge(
+ missed_uprns[["address1", "postcode", "UPRN"]].rename(
+ columns={"UPRN": "uprn"},
+ ),
+ how="left",
+ on=["address1", "postcode"]
+ )
+
+ fetched_asset_list = fetched_asset_list.drop(columns=["row_id"])
+ # We concatename them
+ final_asset_list = pd.concat(
+ [fetched_asset_list, missed[["address1", "postcode", "Property Type", "Bedrooms", "uprn"]]]
+ )
+
+ final_asset_list = final_asset_list.rename(
+ columns={
+ "address1": "address",
+ "Property Type": "property_type",
+ "Bedrooms": "n_bedrooms"
+ }
+ )
+
+ # Finally, we merge on the numeber of bedrooms
+
+ # Extract the non-invasive recommendations:
+ non_invasive_recommendations = []
+ for x in epc_data:
+ non_invasive_recommendations.append(
+ {
+ "uprn": x["uprn"],
+ "recommendations": x["find_my_epc_data"]["recommendations"]
+ }
+ )
+
+ filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
+ save_csv_to_s3(
+ dataframe=pd.DataFrame(final_asset_list),
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=filename
+ )
+
+ # Store the non-invasive recommendations in s3
+ non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
+ save_csv_to_s3(
+ dataframe=pd.DataFrame(non_invasive_recommendations),
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=non_invasive_recommendations_filename
+ )
+
+ # Store the valuations data in s3
+ # valuations_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuations.csv"
+ # save_csv_to_s3(
+ # dataframe=pd.DataFrame(valuations_data),
+ # bucket_name="retrofit-plan-inputs-dev",
+ # file_name=valuations_filename
+ # )
+
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Private",
+ "goal": "Increasing EPC",
+ "goal_value": "A",
+ "trigger_file_path": filename,
+ "already_installed_file_path": "",
+ "patches_file_path": "",
+ "non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
+ "valuation_file_path": "",
+ "scenario_name": "Retrofit Packages",
+ "multi_plan": True,
+ "budget": None,
+ "inclusions": [
+ "cavity_wall_insulation",
+ "loft_insulation",
+ "windows",
+ "solar_pv",
+ "air_source_heat_pump"
+ ]
+ }
+ print(body)
diff --git a/etl/customers/l_and_g/ic_slides.py b/etl/customers/l_and_g/ic_slides.py
new file mode 100644
index 00000000..72dfc2c0
--- /dev/null
+++ b/etl/customers/l_and_g/ic_slides.py
@@ -0,0 +1,243 @@
+import pandas as pd
+from backend.app.utils import sap_to_epc
+
+data = pd.read_csv(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/property_202501170837.csv"
+)
+
+data["year_built"].value_counts()
+
+# 1950-1966 26
+# 1967-1975 37
+# 1976-1982 37
+# 1983-1990 33
+# 1991-1995 139
+# 1996-2002 42
+# 2003-2006 50
+
+data["full_property_type"] = data["property_type"] + ": " + data["built_form"]
+
+houses = data[data["property_type"].isin(["House", "Bungalow"])]
+houses["built_form"].value_counts()
+
+data["property_type"].value_counts()
+data["full_property_type"].value_counts()
+# House: Mid-Terrace 136
+# House: End-Terrace 83
+# House: Semi-Detached 55
+# Flat: Semi-Detached 24
+# Flat: End-Terrace 19
+# House: Detached 10
+# Flat: Mid-Terrace 9
+# Maisonette: Mid-Terrace 9
+# Maisonette: Semi-Detached 8
+# Maisonette: End-Terrace 6
+# Flat: Detached 4
+# Bungalow: Detached 1
+
+epc_data = pd.read_csv(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/basildon EPC Data.csv"
+)
+
+# Classify floor area in <73m2, 73-98, 99-200, 200+
+epc_data["floor_area_bracket"] = epc_data["total_floor_area"].apply(
+ lambda x: "<73" if x < 73 else "73-98" if x < 99 else "99-200" if x < 200 else "200+")
+
+# 73-98 185
+# <73 156
+# 99-200 23
+
+epc_data["wall_type"] = epc_data["walls"].str.split(",").str[0]
+epc_data["wall_type"].value_counts()
+
+# Cavity wall 343
+# Timber frame 15
+# System built 6
+
+# we pull some additional data
+# We want:
+# 1) The list of properties included in the portfolio, with uprn
+# 2) The recommendations against each property with costs, and whether or not the recommendation was defaulted
+# 3) The properties without recommendations and why
+
+from tqdm import tqdm
+import pandas as pd
+import numpy as np
+from sqlalchemy.orm import sessionmaker
+from backend.app.db.connection import db_engine
+from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
+from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
+
+
+def get_data(portfolio_id, scenario_ids):
+ session = sessionmaker(bind=db_engine)()
+ session.begin()
+
+ # Get properties and their details for a specific portfolio
+ properties_query = session.query(
+ PropertyModel,
+ PropertyDetailsEpcModel
+ ).join(
+ PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id
+ ).filter(
+ PropertyModel.portfolio_id == portfolio_id # Filter by portfolio ID
+ ).all()
+
+ # Transform properties data to include all fields dynamically
+ properties_data = [
+ {**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
+ **{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
+ PropertyDetailsEpcModel.__table__.columns}}
+ for prop in properties_query
+ ]
+
+ # Get property IDs from fetched properties
+
+ # Get plans linked to the fetched properties
+ plans_query = session.query(Plan).filter(Plan.scenario_id.in_(scenario_ids)).all()
+
+ # Transform plans data to include all fields dynamically
+ plans_data = [
+ {col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
+ for plan in plans_query
+ ]
+
+ # Extract plan IDs for filtering recommendations through PlanRecommendations
+ plan_ids = [plan['id'] for plan in plans_data]
+
+ # Get recommendations through PlanRecommendations for those plans and that are default
+ recommendations_query = session.query(
+ Recommendation,
+ Plan.scenario_id
+ ).join(
+ PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
+ ).join(
+ Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
+ ).filter(
+ PlanRecommendations.plan_id.in_(plan_ids),
+ Recommendation.default == True # Filtering for default recommendations
+ ).all()
+
+ # Transform recommendations data to include all fields dynamically and include scenario_id
+ recommendations_data = [
+ {**{col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec,
+ col.name) for
+ col in Recommendation.__table__.columns},
+ "Scenario ID": rec.scenario_id}
+ for rec in recommendations_query
+ ]
+
+ session.close()
+
+ return properties_data, plans_data, recommendations_data
+
+
+properties_data, plans_data, recommendations_data = get_data(portfolio_id=124, scenario_ids=[199])
+
+properties_df = pd.DataFrame(properties_data)
+plans_df = pd.DataFrame(plans_data)
+recommendations_df = pd.DataFrame(recommendations_data)
+
+recommended_measures_df = recommendations_df[
+ ["property_id", "measure_type", "estimated_cost", "default"]
+]
+recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]]
+recommended_measures_df = recommended_measures_df.drop(columns=["default"])
+
+post_install_sap = recommendations_df[["property_id", "default", "sap_points"]]
+post_install_sap = post_install_sap[post_install_sap["default"]]
+# Sum up the sap points by property id
+post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index()
+
+recommendations_measures_pivot = recommended_measures_df.pivot(
+ index='property_id',
+ columns='measure_type',
+ values='estimated_cost'
+)
+recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
+
+recommendations_measures_pivot = recommendations_measures_pivot.rename(
+ columns={
+ "air_source_heat_pump": "Cost: Air Source Heat Pump",
+ "cavity_wall_insulation": "Cost: Cavity Wall Insulation",
+ "double_glazing": "Cost: Double Glazing",
+ "loft_insulation": "Cost: Loft Insulation",
+ "mechanical_ventilation": "Cost: Ventilation",
+ "solar_pv": "Cost: Solar PV"
+ }
+)
+recommendations_measures_pivot = recommendations_measures_pivot.fillna(0)
+recommendations_measures_pivot["Recommendation: Air Source Heat Pump"] = (
+ recommendations_measures_pivot["Cost: Air Source Heat Pump"] > 0
+)
+recommendations_measures_pivot["Recommendation: Cavity Wall Insulation"] = (
+ recommendations_measures_pivot["Cost: Cavity Wall Insulation"] > 0
+)
+recommendations_measures_pivot["Recommendation: Double Glazing"] = (
+ recommendations_measures_pivot["Cost: Double Glazing"] > 0
+)
+recommendations_measures_pivot["Recommendation: Loft Insulation"] = (
+ recommendations_measures_pivot["Cost: Loft Insulation"] > 0
+)
+recommendations_measures_pivot["Recommendation: Ventilation"] = (
+ recommendations_measures_pivot["Cost: Ventilation"] > 0
+)
+recommendations_measures_pivot["Recommendation: Solar PV"] = (
+ recommendations_measures_pivot["Cost: Solar PV"] > 0
+)
+
+df = properties_df[
+ [
+ "property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows",
+ "current_epc_rating",
+ "current_sap_points", "total_floor_area", "number_of_rooms",
+ ]
+].merge(
+ recommendations_measures_pivot, how="left", on="property_id"
+).merge(
+ post_install_sap, how="left", on="property_id"
+)
+
+df = df.drop(columns=["property_id"])
+df["sap_points"] = df["sap_points"].fillna(0)
+
+df = df.rename(
+ columns={
+ "uprn": "UPRN",
+ "address": "Address",
+ "postcode": "Postcode",
+ "walls": "Walls",
+ "roof": "Roof",
+ "heating": "Heating",
+ "windows": "Windows",
+ "current_epc_rating": "Current EPC Rating",
+ "current_sap_points": "Current SAP Points",
+ "total_floor_area": "Total Floor Area",
+ "number_of_rooms": "Number of Habitable Rooms",
+ "floor_height": "Floor Height",
+ }
+)
+
+df["Has Recommendations"] = ~pd.isnull(df["Cost: Air Source Heat Pump"])
+
+# We fill missings:
+for col in [
+ "Recommendation: Air Source Heat Pump", "Recommendation: Cavity Wall Insulation",
+ "Recommendation: Double Glazing", "Recommendation: Loft Insulation", "Recommendation: Ventilation",
+ "Recommendation: Solar PV"
+]:
+ df[col] = df[col].fillna(False)
+
+for col in [
+ "Cost: Air Source Heat Pump", "Cost: Cavity Wall Insulation",
+ "Cost: Double Glazing", "Cost: Loft Insulation", "Cost: Ventilation",
+ "Cost: Solar PV"
+]:
+ df[col] = df[col].fillna(0)
+
+# Calculate post SAP
+df["Predicted Post Works SAP"] = df["Current SAP Points"] + df["sap_points"]
+df["Predicted Post Works SAP"] = df["Predicted Post Works SAP"].round()
+df["Predicted Post Works EPC"] = df["Predicted Post Works SAP"].apply(lambda x: sap_to_epc(x))
+
+df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon Data Export - 2.csv", index=False)
diff --git a/etl/customers/remote_assessments/app.py b/etl/customers/remote_assessments/app.py
index 59e0e868..13cdc41b 100644
--- a/etl/customers/remote_assessments/app.py
+++ b/etl/customers/remote_assessments/app.py
@@ -1,9 +1,15 @@
+import os
import pandas as pd
+from dotenv import load_dotenv
from utils.s3 import save_csv_to_s3
+from etl.find_my_epc.AssetListEpcData import AssetListEpcData
-PORTFOLIO_ID = 120
+PORTFOLIO_ID = 126
USER_ID = 8
+load_dotenv(dotenv_path="backend/.env")
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
+
def app():
"""
@@ -13,11 +19,23 @@ def app():
asset_list = [
{
- "uprn": 100030334057,
- "address": "5, Lynton Street",
- "postcode": "DE22 3RW"
+ "address": "Garden Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "building_id": 1,
+ "uprn": 308249,
+ },
+ {
+ "address": "Top Floor Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "building_id": 1,
+ "uprn": 308251
+ },
+ {
+ "address": "First Floor Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "building_id": 1,
+ "uprn": 308250,
}
-
]
asset_list = pd.DataFrame(asset_list)
@@ -29,40 +47,37 @@ def app():
file_name=filename
)
- non_invasive_recommendations = [
- {
- "uprn": 100030334057,
- "recommendations": [
- {
- "type": "internal_wall_insulation",
- "sap_points": 9,
- "survey": True
- },
- {
- "type": "external_wall_insulation",
- "sap_points": 9,
- "survey": True
- },
- {
- "type": "suspended_floor_insulation",
- "sap_points": 2,
- "survey": True
- }
- ]
- }
- ]
+ # Pull the non-invasive recommendations automatically
+ asset_list_epc_client = AssetListEpcData(
+ asset_list=asset_list,
+ epc_auth_token=EPC_AUTH_TOKEN
+ )
+ asset_list_epc_client.get_data()
+ asset_list_epc_client.get_non_invasive_recommendations()
+
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
- dataframe=pd.DataFrame(non_invasive_recommendations),
+ dataframe=pd.DataFrame(asset_list_epc_client.non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
valuation_data = [
{
- "uprn": 100030334057,
- "value": 133_000
+ "address": "Garden Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "valuation": 337_000
+ },
+ {
+ "addresss": "Top Floor Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "valuation": 337_000
+ },
+ {
+ "address": "First Floor Flat, 48 Bedminster Parade",
+ "postcode": "BS3 4HS",
+ "valuation": 337_000
}
]
# Store valuation data to s3
diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py
index 0f757f7b..8538188b 100644
--- a/etl/customers/stonewater/Wave 3 Preparation.py
+++ b/etl/customers/stonewater/Wave 3 Preparation.py
@@ -2905,5 +2905,38 @@ def identify_incorrect_packages():
os.path.join(CUSTOMER_FOLDER_PATH, "Units with assigned packages - with flags.csv"), index=False
)
+
+def revised_model():
+ """
+ This function implements the revised model for Stonewater, where we are looking at new priority postcodes
+ This work was undertaken in January 2021.
+ """
+
+ # 1) Create the new list of properties
+
+ new_priority_postcodes = pd.read_excel(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Jan 2025 Project/Updated 2025 to 2030 "
+ "priority list.xlsx"
+ )
+
+ original_archetypes = pd.read_excel(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
+ "- Archetyped V3.1.xlsx",
+ header=4
+ )
+ original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])]
+ original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"]
+ original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int)
+
+ original_archetypes = original_archetypes[
+ ["Address ID", "Archetype ID", ""]
+ ]
+
+ # Check if we have all of the addresses
+ missed = original_archetypes[
+ ~original_archetypes["Address ID"].isin(new_priority_postcodes["Address ID"].values)
+ ]["Archetype ID"].unique()
+ assert
+
# if __name__ == "__main__":
# main()
diff --git a/etl/customers/stonewater/data_cleaning.py b/etl/customers/stonewater/data_cleaning.py
new file mode 100644
index 00000000..8751960c
--- /dev/null
+++ b/etl/customers/stonewater/data_cleaning.py
@@ -0,0 +1,137 @@
+import os
+import shutil
+from tqdm import tqdm
+
+
+def delete_large_files():
+ """
+ This function deletes photos, designs and other files which we don't need
+ :return:
+ """
+
+ folder_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys"
+
+ # List the contents of this folder since in each sub-folder we have the property folders
+ contents = os.listdir(folder_path)
+
+ for subfolder in contents:
+ if not os.path.isdir(os.path.join(folder_path, subfolder)):
+ continue
+ subfolder_path = os.path.join(folder_path, subfolder)
+ # List the contents
+ property_folders = os.listdir(subfolder_path)
+
+ for property in tqdm(property_folders):
+ # Check if it's a directory
+ if not os.path.isdir(os.path.join(subfolder_path, property)):
+ continue
+
+ property_path = os.path.join(subfolder_path, property)
+ property_contents = os.listdir(property_path)
+ # We delete the contents of the following folders:
+ # '1. RA Property Pics'
+ # '4. Air Tightness Tests'
+ # '5. RD Design Info'
+ for folder_to_delete in ["1. RA Property Pics", "4. Air Tightness Tests", "5. RD Design Info",
+ "1. RA Property PIcs", "Post EPC Photos", "4. RD Design Info",
+ "5. Installer Info", "6. Trustmark lodgement", "7.Post Install Inspection Photos",
+ "6. Trustmark Lodgement", "7. Post Inspection Photos"]:
+ if folder_to_delete not in property_contents:
+ continue
+ folder_to_delete_path = os.path.join(property_path, folder_to_delete)
+ if os.path.isdir(folder_to_delete_path):
+ # Delete the folder, even if it's not empty
+ shutil.rmtree(folder_to_delete_path)
+
+ # We now check the '2. RA Coordinator Info' folder for any .MOV files and delete them
+ if "2. RA Coordinator Info" not in property_contents:
+ coordinator_folder = "1. RA Coordinator Info"
+ else:
+ coordinator_folder = "2. RA Coordinator Info"
+ coordinator_info_path = os.path.join(property_path, coordinator_folder)
+ coordinator_info_contents = os.listdir(coordinator_info_path)
+ # Look for .MOV files and .jpg files
+ for file in coordinator_info_contents:
+ if file.endswith(".MOV"):
+ os.remove(os.path.join(coordinator_info_path, file))
+
+ if file.endswith(".jpg"):
+ os.remove(os.path.join(coordinator_info_path, file))
+
+ if "Property Pics" in coordinator_info_contents:
+ # Delete folder and contents
+ shutil.rmtree(os.path.join(coordinator_info_path, "Property Pics"))
+
+
+def download_data_from_sharepoint():
+ # Given a sharepoint location, this function will download the retrofit assessment folders from the locations
+ # specified in the sharepoint location
+ from etl.access_reporting.app import SharePointClient
+
+ sharepoint_client = SharePointClient(
+ tenant_id="10d5af8b-2cfd-4882-9ccd-b96e4812dacf",
+ client_id="6832a4c5-fb8c-4082-a746-4f51e1020f0d",
+ client_secret="xpC8Q~Frww48SM1V-D8lGy5iOY7P_cJ7FF3jgarQ",
+ site_id="bc925a9a-ad0b-4de9-9a3c-e61014cc7489"
+ )
+
+ # Retrieve the data from Sharepoint and write to local machine
+ contents = sharepoint_client.list_folder_contents(
+ drive_id=sharepoint_client.document_drive["id"],
+ folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders"
+ )
+
+ len(contents["value"])
+ folders_to_pull = [
+ folder for folder in contents["value"] if folder["name"] in ["3. Wiltshire", "4. Bournemouth", "5. Coventry"]
+ ]
+ for folder_to_pull in folders_to_pull:
+ # Get the contents
+ folder_contents = sharepoint_client.list_folder_contents(
+ drive_id=sharepoint_client.document_drive["id"],
+ folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders" + "/" +
+ folder_to_pull["name"],
+ page_size=100
+ )
+
+ property_folders = [f for f in folder_contents["value"]]
+
+ for property_folder in property_folders:
+ # We go into each property folder and get the contents
+ property_folder_contents = sharepoint_client.list_folder_contents(
+ drive_id=sharepoint_client.document_drive["id"],
+ folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders" + "/" +
+ folder_to_pull["name"] + "/" + property_folder["name"]
+ )
+ # We look for the retrofit assessment folder:
+ property_sub_folders = [
+ f for f in property_folder_contents["value"] if "ra coordinator info" in f["name"].lower()
+ ]
+
+ if not property_sub_folders:
+ continue
+
+ # if we have this, we download the folder and store it on my laptop!
+ property_sub_folder = property_sub_folders[0]
+
+ property_folder_path = os.path.join(
+ "Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders",
+ folder_to_pull["name"],
+ property_folder["name"],
+ property_sub_folder["name"]
+ )
+
+ download_dir = os.path.join(
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys",
+ folder_to_pull["name"],
+ property_folder["name"],
+ property_sub_folder["name"]
+ )
+
+ # We download the folder
+ sharepoint_client.download_sharepoint_folder(
+ drive_id=sharepoint_client.document_drive["id"],
+ folder_path=property_folder_path,
+ download_dir=download_dir,
+ excluded_file_types=["MOV"]
+ )
diff --git a/etl/customers/stonewater/potential_eco_properties.py b/etl/customers/stonewater/potential_eco_properties.py
index c0301e9a..bda9c30c 100644
--- a/etl/customers/stonewater/potential_eco_properties.py
+++ b/etl/customers/stonewater/potential_eco_properties.py
@@ -7,6 +7,8 @@ from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from utils.s3 import read_from_s3, read_pickle_from_s3
+import msoffcrypto
+from io import BytesIO
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
@@ -64,6 +66,28 @@ def app():
This code creates a list of cavity properties, for review
"""
+ # Read in the password protected master
+ # TODO: This file should be deleted!
+
+ # Path to the password-protected Excel file
+ file_path = ("/Users/khalimconn-kowlessar/Downloads/STONEWATER MASTER SHEET - UPDATED 20.5.24 - K- PASSWORD "
+ "PROTECTED.xlsx")
+ password = "STONE123" # Replace with the actual password
+
+ # Open the file and decrypt it
+ with open(file_path, "rb") as f:
+ decrypted_file = BytesIO()
+ office_file = msoffcrypto.OfficeFile(f)
+ office_file.load_key(password=password)
+ office_file.decrypt(decrypted_file)
+
+ # Read the decrypted file into a DataFrame
+ eco_rolling_master = pd.read_excel(decrypted_file, sheet_name="Sheet1", engine="openpyxl")
+
+ eco_rolling_master = eco_rolling_master[
+ ~eco_rolling_master['INSTALL/CANCELLATION DATE'].str.contains("CANCELLED")
+ ]
+
archetyped_properties = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 - "
"Archetyped V3.1.xlsx",
@@ -116,13 +140,16 @@ def app():
features_to_merge = features[
[
- "Address ID", "Age", "Property Type", "Walls", "Roofs", "Glazing", "Heating", "Main Fuel", "Hot Water",
+ "Address ID", "Organisation Reference", "Age", "Property Type", "Walls", "Roofs", "Glazing", "Heating",
+ "Main Fuel",
+ "Hot Water",
"Renewables", "Total Floor Area"
]
]
stonewater_cavity_properties = archetyped_properties[
- ["Name", "Postcode", "Osm. ID", "Address ID", "UPRN", "UDPRN", "Archetype ID", "House no", "Street name",
+ ["Name", "Postcode", "Osm. ID", "Org. ref.", "Address ID", "UPRN", "UDPRN", "Archetype ID", "House no",
+ "Street name",
"Address line 2", "City/Town", "Is Cavity Property", "Survey shows CWI needed for Archetype"]
].merge(
features_to_merge, how="left", on="Address ID"
@@ -166,77 +193,137 @@ def app():
stonewater_cavity_properties["Reason Included"]
)
+ # We flag units that were installed under ECO3
+ numeric_ids = eco_rolling_master[eco_rolling_master["STONEWATER UPRN"] != "NOT ON ASSET LIST"]
+ numeric_ids = numeric_ids[~pd.isnull(numeric_ids["STONEWATER UPRN"])]
+ numeric_ids["STONEWATER UPRN"] = numeric_ids["STONEWATER UPRN"].astype(int)
+
+ stonewater_cavity_properties["Installed under ECO3"] = stonewater_cavity_properties["Org. ref."].isin(
+ numeric_ids['STONEWATER UPRN'].values
+ )
+
+ # Which postcodes were installed under ECO3
+ priority_list_eco3 = stonewater_cavity_properties[
+ stonewater_cavity_properties["Installed under ECO3"]
+ ]["Postcode"].unique()
+
+ # These are properties that were not installed under ECO3, that have the same postcodes as properties
+ # installed under ECO3
+
+ # These are 66 properties we might want to start with as an immediate priority
+ stonewater_cavity_properties["Same Postcode as Installed under ECO3"] = (
+ ~stonewater_cavity_properties["Installed under ECO3"] & (
+ stonewater_cavity_properties["Postcode"].isin(priority_list_eco3)
+ )
+ )
+
# We get the EPC data
- epc_data = json.loads(
- read_from_s3(
- bucket_name="retrofit-data-dev",
- s3_file_name="customers/Stonewater/clustering/epc_data.json"
- )
- )
- epc_data = pd.DataFrame(epc_data)
-
- epc_data["uprn"] = np.where(
- epc_data["internal_id"] == 1091,
- 83143766,
- epc_data["uprn"]
- )
-
- epc_data_batch_2 = read_pickle_from_s3(
- s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
- bucket_name="retrofit-data-dev"
- )
- epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
-
- complete_epcs = pd.concat([epc_data, epc_data_batch_2])
-
- epcs_to_merge = complete_epcs[
- [
- "uprn",
- "address",
- "postcode",
- "property-type",
- "built-form",
- "inspection-date",
- "current-energy-rating",
- "current-energy-efficiency",
- "roof-description",
- "walls-description",
- "transaction-type",
- "secondheat-description",
- "total-floor-area",
- "construction-age-band",
- "floor-height",
- "number-habitable-rooms",
- "mainheat-description",
- "energy-consumption-current"
- ]
- ].rename(
- columns={
- "address": "Address",
- "postcode": "Postcode",
- "inspection-date": "Date of last EPC",
- "current-energy-efficiency": "SAP score on register",
- "current-energy-rating": "EPC rating on register",
- "property-type": "Property Type",
- "built-form": "Archetype",
- "total-floor-area": "Property Floor Area",
- "construction-age-band": "Property Age Band",
- "floor-height": "Property Floor Height",
- "number-habitable-rooms": "Number of Habitable Rooms",
- "walls-description": "Wall Construction",
- "roof-description": "Roof Construction",
- "mainheat-description": "Heating Type",
- "secondheat-description": "Secondary Heating",
- "transaction-type": "Reason for last EPC",
- "energy-consumption-current": "Heat Demand (kWh/m2)",
- }
- )
- # We de-dupe, taking the newest on the date the EPC was lod
- epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
- epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
- epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
+ # epc_data = json.loads(
+ # read_from_s3(
+ # bucket_name="retrofit-data-dev",
+ # s3_file_name="customers/Stonewater/clustering/epc_data.json"
+ # )
+ # )
+ # epc_data = pd.DataFrame(epc_data)
+ #
+ # epc_data["uprn"] = np.where(
+ # epc_data["internal_id"] == 1091,
+ # 83143766,
+ # epc_data["uprn"]
+ # )
+ #
+ # epc_data_batch_2 = read_pickle_from_s3(
+ # s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
+ # bucket_name="retrofit-data-dev"
+ # )
+ # epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
+ #
+ # complete_epcs = pd.concat([epc_data, epc_data_batch_2])
+ #
+ # epcs_to_merge = complete_epcs[
+ # [
+ # "uprn",
+ # "address",
+ # "postcode",
+ # "property-type",
+ # "built-form",
+ # "inspection-date",
+ # "current-energy-rating",
+ # "current-energy-efficiency",
+ # "roof-description",
+ # "walls-description",
+ # "transaction-type",
+ # "secondheat-description",
+ # "total-floor-area",
+ # "construction-age-band",
+ # "floor-height",
+ # "number-habitable-rooms",
+ # "mainheat-description",
+ # "energy-consumption-current"
+ # ]
+ # ].rename(
+ # columns={
+ # "address": "Address",
+ # "postcode": "Postcode",
+ # "inspection-date": "Date of last EPC",
+ # "current-energy-efficiency": "SAP score on register",
+ # "current-energy-rating": "EPC rating on register",
+ # "property-type": "Property Type",
+ # "built-form": "Archetype",
+ # "total-floor-area": "Property Floor Area",
+ # "construction-age-band": "Property Age Band",
+ # "floor-height": "Property Floor Height",
+ # "number-habitable-rooms": "Number of Habitable Rooms",
+ # "walls-description": "Wall Construction",
+ # "roof-description": "Roof Construction",
+ # "mainheat-description": "Heating Type",
+ # "secondheat-description": "Secondary Heating",
+ # "transaction-type": "Reason for last EPC",
+ # "energy-consumption-current": "Heat Demand (kWh/m2)",
+ # }
+ # )
+ # # We de-dupe, taking the newest on the date the EPC was lod
+ # epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
+ # epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
+ # epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
stonewater_cavity_properties["UPRN"] = stonewater_cavity_properties["UPRN"].astype("Int64").astype(str)
+ stonewater_cavity_properties["Reason Included"].value_counts()
+ # Find the postcodes where an Osmosis survey revealed a need for CWI
+ postcodes_found_needing_cwi = stonewater_cavity_properties[
+ stonewater_cavity_properties["Reason Included"].isin(
+ [
+ "Survey revealed potential need for CWI or extract and re-fill",
+ "Surveyed revealed potential need for CWI or extract and re-fill and is an as built cavity property",
+ "Survey showed this property needs CWI",
+ "Survey showed this property could need extract and re-fill"
+ ]
+ )
+ ]["Postcode"].unique()
+
+ stonewater_cavity_properties["Suspected Needs CWI - not surveyed"] = (
+ (
+ stonewater_cavity_properties[
+ "Postcode"].isin(
+ postcodes_found_needing_cwi)
+ ) & (
+ ~stonewater_cavity_properties[
+ "Reason Included"].isin(
+ [
+ "Survey revealed potential need "
+ "for CWI or extract and re-fill",
+ "Surveyed revealed potential "
+ "need for CWI or extract and "
+ "re-fill and is an as built "
+ "cavity property",
+ "Survey showed this property "
+ "needs CWI",
+ "Survey showed this property "
+ "could need extract and re-fill"
+ ]
+ )
+ )
+ )
# Merge the EPCs on, with the data we need
stonewater_cavity_properties = stonewater_cavity_properties.rename(
@@ -252,12 +339,12 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
- ).merge(
- epcs_to_merge,
- how="left",
- left_on="UPRN",
- right_on="uprn"
- )
+ ) # .merge(
+ # epcs_to_merge,
+ # how="left",
+ # left_on="UPRN",
+ # right_on="uprn"
+ # )
# We now flag the additional properties in the as built list
@@ -288,8 +375,56 @@ def app():
additional_properties = additional_properties.merge(house_numbers, how="left", on="Address ID")
additional_properties["row_id"] = additional_properties["Address ID"].copy()
+ # Flag any units in this list that were installed under ECO3
+ additional_properties["Installed under ECO3"] = additional_properties["Organisation Reference"].isin(
+ numeric_ids['STONEWATER UPRN'].values
+ )
+
+ # Additional list ECO3
+ additional_list_eco3 = additional_properties[additional_properties["Installed under ECO3"]]["Postcode"].unique()
+
+ # These are properties that were not installed under ECO3, that have the same postcodes as properties
+ # installed under ECO3
+ # These are 297 properties we might want to start with as an immediate priority
+ additional_properties["Same Postcode as Installed under ECO3"] = (
+ ~additional_properties["Installed under ECO3"] & (
+ additional_properties["Postcode"].isin(additional_list_eco3)
+ )
+ )
+
+ # We do some additional manual checks, for ECO3 properties that were installed that didn't get matched to either
+ # dataaset
+ numeric_ids["In asset list"] = numeric_ids["STONEWATER UPRN"].isin(
+ stonewater_cavity_properties['Org. ref.'].astype(int).values
+ )
+ numeric_ids["In asset list"] = numeric_ids["In asset list"] | (
+ numeric_ids["STONEWATER UPRN"].isin(
+ additional_properties['Organisation Reference'].astype(int).values
+ )
+ )
+
+ # eco3_installs_not_in_asset_list = numeric_ids[~numeric_ids["In asset list"]]
+ # # We now take samples of properties randomly and manually check the ID against the asset list
+ # print(eco3_installs_not_in_asset_list.sample(1)[["STONEWATER UPRN", "Post Code", "NO ", "Street / Block Name", ]])
+ # # Checked STONEWATER UPRN
+ # # 9862, BH15 1NR, 33, THE QUAY FOYER [x]
+ # # 12785, S01 66PN, 57, SEACOLE GARDENS [x]
+ # # 26071, MK42 0TE, 51, De Havilland Avenue, Shortstown [x]
+ # # 18213, HR6 9UW, 20 Ford Street [x]
+ # # 24344, LU4 9FF, 6 SEAL CLOSE [x]
+ # # 31222, SN14 0QZ, 7 HARDBROOK COURT [x]
+ # # 9343, SP4 7XL, 10 OAK PLACE [x]
+ # # 34730, LU5 5TN, 4 TUDOR DRIVE [x]
+ # # 7021, BN27 2BZ, 32 BUTTS FIELD []
+ #
+ # stonewater_cavity_properties[stonewater_cavity_properties['Org. ref.'] == 7021]
+ # stonewater_cavity_properties[stonewater_cavity_properties['Postcode'] == "BN27 2BZ"]["Name"]
+ #
+ # additional_properties[additional_properties['Organisation Reference'] == 7021]
+ # additional_properties[additional_properties['Postcode'] == "BN27 2BZ"][["Address"]]
+
# Pull the EPCs for these properties
- additional_properties_epcs, errors = get_data(additional_properties)
+ # additional_properties_epcs, errors = get_data(additional_properties)
# Save this data as a pickle
# import pickle
@@ -297,12 +432,20 @@ def app():
# "wb") as f:
# pickle.dump(additional_properties_epcs, f)
+ additional_properties["Suspected Needs CWI - not surveyed"] = (
+ (
+ additional_properties["Postcode"].isin(postcodes_found_needing_cwi)
+ )
+ )
+
+ additional_properties["Same Postcode as Installed under ECO3"].value_counts()
+
# We drop Full Address
additional_properties = additional_properties.drop(columns=["Full Address"])
additional_properties2 = additional_properties[[
- "row_id", "Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
- "Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area",
-
+ "Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
+ "Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area", 'Installed under ECO3',
+ 'Same Postcode as Installed under ECO3'
]].rename(
columns={
"SAP": "Parity - Predicted SAP",
@@ -318,56 +461,58 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
- ).merge(
- pd.DataFrame(additional_properties_epcs)[
- [
- "row_id",
- "property-type",
- "built-form",
- "inspection-date",
- "current-energy-rating",
- "current-energy-efficiency",
- "roof-description",
- "walls-description",
- "transaction-type",
- "secondheat-description",
- "total-floor-area",
- "construction-age-band",
- "floor-height",
- "number-habitable-rooms",
- "mainheat-description",
- "energy-consumption-current"
- ]
- ].rename(
- columns={
- "inspection-date": "Date of last EPC",
- "current-energy-efficiency": "SAP score on register",
- "current-energy-rating": "EPC rating on register",
- "property-type": "Property Type",
- "built-form": "Archetype",
- "total-floor-area": "Property Floor Area",
- "construction-age-band": "Property Age Band",
- "floor-height": "Property Floor Height",
- "number-habitable-rooms": "Number of Habitable Rooms",
- "walls-description": "Wall Construction",
- "roof-description": "Roof Construction",
- "mainheat-description": "Heating Type",
- "secondheat-description": "Secondary Heating",
- "transaction-type": "Reason for last EPC",
- "energy-consumption-current": "Heat Demand (kWh/m2)",
- }
- ),
- how="left",
- on="row_id"
- )
+ ) # .merge(
+ # pd.DataFrame(additional_properties_epcs)[
+ # [
+ # "row_id",
+ # "property-type",
+ # "built-form",
+ # "inspection-date",
+ # "current-energy-rating",
+ # "current-energy-efficiency",
+ # "roof-description",
+ # "walls-description",
+ # "transaction-type",
+ # "secondheat-description",
+ # "total-floor-area",
+ # "construction-age-band",
+ # "floor-height",
+ # "number-habitable-rooms",
+ # "mainheat-description",
+ # "energy-consumption-current"
+ # ]
+ # ].rename(
+ # columns={
+ # "inspection-date": "Date of last EPC",
+ # "current-energy-efficiency": "SAP score on register",
+ # "current-energy-rating": "EPC rating on register",
+ # "property-type": "Property Type",
+ # "built-form": "Archetype",
+ # "total-floor-area": "Property Floor Area",
+ # "construction-age-band": "Property Age Band",
+ # "floor-height": "Property Floor Height",
+ # "number-habitable-rooms": "Number of Habitable Rooms",
+ # "walls-description": "Wall Construction",
+ # "roof-description": "Roof Construction",
+ # "mainheat-description": "Heating Type",
+ # "secondheat-description": "Secondary Heating",
+ # "transaction-type": "Reason for last EPC",
+ # "energy-consumption-current": "Heat Demand (kWh/m2)",
+ # }
+ # ),
+ # how="left",
+ # on="row_id"
+ # )
# We save the data locally
stonewater_cavity_properties.to_csv(
- "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties.csv",
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
+ "postcodes.csv",
index=False
)
additional_properties2.to_csv(
- "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties.csv",
+ "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
+ "non-priority postcodes.csv",
index=False
)
# Save the survey findings
diff --git a/etl/customers/waltham_forest/whlg eligibile properties.py b/etl/customers/waltham_forest/whlg eligibile properties.py
index fee988c1..9e1949f7 100644
--- a/etl/customers/waltham_forest/whlg eligibile properties.py
+++ b/etl/customers/waltham_forest/whlg eligibile properties.py
@@ -44,6 +44,10 @@ epc_data["has_conservation_restrictions"] = (
| (epc_data["is_heritage_building"] == True)
)
+whlg_eligible_postcodes["Local Authority"].value_counts()
+
+whlg_eligible_postcodes = whlg_eligible_postcodes[whlg_eligible_postcodes["Local Authority"] == "Waltham Forest"]
+
# Pathway 1:
# Match based on eligible postcodes
pathway1 = epc_data[epc_data["postcode"].isin(whlg_eligible_postcodes["Postcode"].values)]
@@ -67,6 +71,10 @@ pathway1["EPC Date"] = pd.to_datetime(pathway1["EPC Date"]).dt.strftime("%Y-%m-%
# Create a year EPC was lodged
pathway1["EPC Year"] = pd.to_datetime(pathway1["EPC Date"]).dt.year
+low_epc = pathway1[pathway1["EPC Rating"].isin(["F", "G"])]
+low_epc["EPC Rating"].value_counts()
+low_epc.tail(1)[["address", "postcode"]]
+
pathway1.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Waltham Forest WHLG - Pathway 1 Eligibility.csv",
index=False
diff --git a/etl/find_my_epc/AssetListEpcData.py b/etl/find_my_epc/AssetListEpcData.py
new file mode 100644
index 00000000..bce8cd1f
--- /dev/null
+++ b/etl/find_my_epc/AssetListEpcData.py
@@ -0,0 +1,94 @@
+import time
+import pandas as pd
+from tqdm import tqdm
+from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
+from backend.SearchEpc import SearchEpc
+from utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+class AssetListEpcData:
+
+ def __init__(self, asset_list: pd.DataFrame, epc_auth_token: str):
+
+ """
+ This class handles pulling data assocaited to an asset list and performs common functions like
+ getting EPC api data, retrieveing data form the find my epc website and extracting non-intrusive
+ recommendations
+ :param asset_list:
+ """
+
+ # Check the asset list contains the correct columns
+
+ self.asset_list = self.check_asset_list(asset_list)
+ self.epc_auth_token = epc_auth_token
+
+ self.extracted_data = None
+ self.non_invasive_recommendations = None
+
+ @staticmethod
+ def check_asset_list(asset_list):
+ # TODO: Update this with pydantic
+
+ return asset_list
+
+ def get_non_invasive_recommendations(self):
+
+ """
+ Extracts non-invasive recommendations in a format that can be used by the engine
+ :return:
+ """
+
+ if self.extracted_data is None:
+ raise ValueError("Please run get_data first")
+
+ self.non_invasive_recommendations = [
+ {
+ "uprn": r.get("uprn"),
+ "address": r["address"],
+ "postcode": r["postcode"],
+ "recommendations": r["recommendations"]
+ } for r in self.extracted_data
+ ]
+
+ def get_data(self):
+
+ logger.info("Retrieving data for given asset list")
+
+ # Pull the additional data
+ extracted_data = []
+ for _, home in tqdm(self.asset_list.iterrows(), total=len(self.asset_list)):
+ add1 = home["address"]
+ pc = home["postcode"]
+ # Retrieve the EPC data
+ epc_searcher = SearchEpc(
+ address1=add1,
+ postcode=pc,
+ uprn=home.get("uprn"),
+ auth_token=self.epc_auth_token,
+ os_api_key=""
+ )
+ epc_searcher.find_property(skip_os=True)
+ if epc_searcher.newest_epc is None:
+ continue
+
+ find_epc_searcher = RetrieveFindMyEpc(
+ address=epc_searcher.newest_epc["address1"],
+ postcode=epc_searcher.newest_epc["postcode"]
+ )
+ find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
+ time.sleep(0.5)
+ # We need uprn
+
+ extracted_data.append(
+ {
+ "uprn": home.get("uprn"),
+ "address": home["address"],
+ "postcode": home["postcode"],
+ **find_epc_data,
+ }
+ )
+
+ self.extracted_data = extracted_data
+ logger.info("Data Extrction complete")
diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py
index 5ea35a64..f93a5a73 100644
--- a/etl/find_my_epc/RetrieveFindMyEpc.py
+++ b/etl/find_my_epc/RetrieveFindMyEpc.py
@@ -263,7 +263,7 @@ class RetrieveFindMyEpc:
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Change heating to gas condensing boiler": ["boiler_upgrade"],
- "Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"],
+ "Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heater"],
"Flat roof or sloping ceiling insulation": ["flat_roof_insulation"],
"Heating controls (room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
@@ -291,7 +291,7 @@ class RetrieveFindMyEpc:
"PV Cells recommendation": [],
"Replacement glazing units": ["double_glazing"],
"Heating controls (time and temperature zone control)": ["time_temperature_zone_control"],
- "High heat retention storage heaters": ["high_heat_retention_storage_heaters"],
+ "High heat retention storage heaters": ["high_heat_retention_storage_heater"],
"Gas condensing boiler": ["boiler_upgrade"],
"Change room heaters to condensing boiler": ["boiler_upgrade"],
"Cylinder thermostat": ["cylinder_thermostat"],
@@ -300,6 +300,8 @@ class RetrieveFindMyEpc:
"Fan assisted storage heaters": [],
"Fan-assisted storage heaters": [],
"Step 1:": [],
+ "Step 2:": [],
+ 'Step 3:': [],
"Biomass stove with boiler": [],
"Replace boiler with biomass boiler": [],
"Heating controls (room thermostat and thermostatic radiator valves)": [
@@ -308,7 +310,14 @@ class RetrieveFindMyEpc:
"Heating controls (programmer, and thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
- "Replacement warm air unit": []
+ "Heating controls (programmer and TRVs)": [
+ "roomstat_programmer_trvs", "time_temperature_zone_control"
+ ],
+ "Heating controls (programmer and room thermostat)": [
+ "roomstat_programmer_trvs", "time_temperature_zone_control"
+ ],
+ "Replacement warm air unit": [],
+ "Secondary glazing": ["secondary_glazing"]
}
survey = True
diff --git a/etl/funding/app.py b/etl/funding/app.py
new file mode 100644
index 00000000..fba48ca4
--- /dev/null
+++ b/etl/funding/app.py
@@ -0,0 +1,35 @@
+"""
+This scipt prepares the data, required for us to perform funding calculations. The starting data should be stored
+on the machine this is being run on, and this will prepare the information and upload if
+"""
+import pandas as pd
+from utils.s3 import save_csv_to_s3
+
+STAGE = "dev"
+DATA_BUCKET = "retrofit-data-{stage}"
+PROJECTS_SCORES_MATRIX_LOCATION = "/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv"
+WHLG_ELIGIBLE_POSTCODES = "/Users/khalimconn-kowlessar/Downloads/WHLG-eligible-postcodes.xlsx"
+
+
+def app():
+ # Read in the project scores matrix
+ project_scores_matrix = pd.read_csv(PROJECTS_SCORES_MATRIX_LOCATION)
+
+ # Store in AWS S3
+ save_csv_to_s3(
+ dataframe=project_scores_matrix,
+ bucket_name=DATA_BUCKET.format(stage=STAGE),
+ file_name="funding/ECO4 Full Project Scores Matrix.csv"
+ )
+
+ # Read in the Warm Homes Local Grant eligible postcodes data
+ whlg_eligible_postcodes = pd.read_excel(WHLG_ELIGIBLE_POSTCODES, sheet_name="Eligible postcodes", header=1)
+ # We tidy up the data before we store
+ whlg_eligible_postcodes = whlg_eligible_postcodes[["Postcode"]]
+ whlg_eligible_postcodes["Postcode"] = whlg_eligible_postcodes["Postcode"].str.lower()
+
+ save_csv_to_s3(
+ dataframe=whlg_eligible_postcodes,
+ bucket_name=DATA_BUCKET.format(stage=STAGE),
+ file_name="funding/whlg eligible postcodes.csv"
+ )
diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py
index 9ed55185..8d19aa84 100644
--- a/etl/route_march_data_pull/app.py
+++ b/etl/route_march_data_pull/app.py
@@ -1,6 +1,5 @@
import os
import time
-from idlelib.iomenu import errors
import pandas as pd
import numpy as np
@@ -25,12 +24,11 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
epc_data = []
errors = []
no_epc = []
- # home = asset_list[asset_list["row_id"] == errors[5]].squeeze()
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
- house_number = home[address1_column]
- full_address = home[fulladdress_column]
+ house_number = home[address1_column].strip()
+ full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
@@ -58,7 +56,13 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
- add1 = full_address.split(",")[1].strip()
+ add1 = full_address.split(",")
+ if len(add1) > 1:
+ add1 = add1[1].strip()
+ else:
+ # Try splitting on space
+ add1 = full_address.split(" ")[0].strip()
+
else:
add1 = str(house_number)
searcher = SearchEpc(
@@ -128,6 +132,10 @@ def extract_address1(asset_list, full_address_col, method="first_two_words"):
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
+ if method == "first_word":
+ asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
+ return asset_list
+
raise ValueError(f"Method {method} not recognized")
@@ -154,17 +162,19 @@ def app():
Property UPRN
"""
- DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Watford"
- DATA_FILENAME = "JS Mailing List 10122024.xlsx"
- SHEET_NAME = "Export"
- POSTCODE_COLUMN = "Postcode"
- FULLADDRESS_COLUMN = "Property Address"
- ADDRESS1_COLUMN = "Address Line 1"
- ADDRESS1_METHOD = None
+ DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Southern"
+ DATA_FILENAME = "January 2025 Additions Query.xlsx"
+ SHEET_NAME = "Jan 2025 additions"
+ POSTCODE_COLUMN = "Post Code"
+ FULLADDRESS_COLUMN = "Street / Block Name"
+ ADDRESS1_COLUMN = None
+ ADDRESS1_METHOD = "first_word"
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
- MANUAL_UPRN_MAP = {}
+ MANUAL_UPRN_MAP = {
+ "Ardelagh Ardelagh Faris Lane Woodham Addlestone KT15 3DJ": 100061484560
+ }
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
@@ -213,6 +223,9 @@ def app():
manual_uprn_map=MANUAL_UPRN_MAP
)
+ no_data = asset_list[asset_list["row_id"].isin(no_epc)]
+ print(no_data[[FULLADDRESS_COLUMN, POSTCODE_COLUMN]])
+
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
@@ -372,7 +385,7 @@ def app():
how="left",
on="row_id"
)
- asset_list = asset_list.drop(columns=["row_id"])
+ asset_list = asset_list.drop(columns=["row_id", "index"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
diff --git a/recommendations/Costs.py b/recommendations/Costs.py
index ee4db7eb..2312dff2 100644
--- a/recommendations/Costs.py
+++ b/recommendations/Costs.py
@@ -719,8 +719,9 @@ class Costs:
"labour_days": labour_days
}
+ @classmethod
def solar_pv(
- self,
+ cls,
n_panels: int | float,
has_battery: bool = False,
array_cost=None,
@@ -774,7 +775,7 @@ class Costs:
# We add an additional cost for scaffolding
# The costs from installers exclude VAT
- vat = subtotal * self.VAT_RATE
+ vat = subtotal * cls.VAT_RATE
total_cost = subtotal + vat
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people
diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py
index 1eab7d42..c5c07f89 100644
--- a/recommendations/HeatingRecommender.py
+++ b/recommendations/HeatingRecommender.py
@@ -1,6 +1,5 @@
import re
import backend.app.assumptions as assumptions
-from etl.customers.immo.pilot.asset_list import non_invasive_recommendations
from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE
from recommendations.recommendation_utils import (
check_simulation_difference, override_costs, combine_recommendation_configs
@@ -632,7 +631,8 @@ class HeatingRecommender:
heating_controls_only,
system_change,
system_type,
- measure_type
+ measure_type,
+ non_intrusive_recommendation=None
):
"""
Given a recommendation for heating controls, and a recommendation for the heating system, we combine the two
@@ -650,8 +650,13 @@ class HeatingRecommender:
:param system_type: The type of heating system we are recommending
:param measure_type: The type of measure we are recommending - more granular than the "type" field, allowing us
to distinguish between different types of heating recommendations
+ :param non_intrusive_recommendation: A non-intrusive recommendation, which may specify the number of SAP points
+ or a cost for this recommendation
"""
+ if non_intrusive_recommendation is None:
+ non_intrusive_recommendation = {}
+
# We produce recommendations with & without heating controls
# We will also produce a recommendation for heating controls only
heating_controls_switch = [True, False] if controls_recommendations else [False]
@@ -699,13 +704,14 @@ class HeatingRecommender:
"description": recommendation_description,
"starting_u_value": None,
"new_u_value": None,
- "sap_points": None,
+ "sap_points": non_intrusive_recommendation.get("sap_points"),
"already_installed": already_installed,
**total_costs,
"simulation_config": recommendation_simulation_config,
"description_simulation": recommendation_description_simulation,
# We insert the heating system type here
- "system_type": system_type
+ "system_type": system_type,
+ "survey": non_intrusive_recommendation.get("survey", False)
}
output.append(recommendation)
@@ -808,6 +814,13 @@ class HeatingRecommender:
# No recommendation needed
return
+ # We check if there is a high heat retention non-intrusive recommendation
+ non_intrusive_recommendation = next(
+ (r for r in self.property.non_invasive_recommendations if
+ r["type"] == "high_heat_retention_storage_heater"),
+ {}
+ )
+
# We check if the property has dual heating in place with a boiler and storage heaters
if self.dual_heating:
new_heating_description = self.DUAL_HEATING_DESCRIPTIONS[
@@ -896,7 +909,8 @@ class HeatingRecommender:
heating_controls_only=heating_controls_only,
system_change=system_change,
system_type="high_heat_retention_storage_heater",
- measure_type="high_heat_retention_storage_heater"
+ measure_type="high_heat_retention_storage_heater",
+ non_intrusive_recommendation=non_intrusive_recommendation
)
if _return:
return recommendations
diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py
index 189581d8..15614a0b 100644
--- a/recommendations/Recommendations.py
+++ b/recommendations/Recommendations.py
@@ -528,6 +528,9 @@ class Recommendations:
previous_phase_values = {
"sap": float(property_instance.data["current-energy-efficiency"]),
+ # For carbon, even though we generally use the updated figure which includes the carbon
+ # associated to appliances, for this scoring process we use the EPC carbon value. This means
+ # that we don't overestimate the impact since the model uses the EPC carbon value
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
@@ -691,6 +694,10 @@ class Recommendations:
"""
This method inserts the kwh savings and the bill savings that the customer will make from the recommendations
based on the predictions from the ML model
+
+ It also ensures we base our solar savings and solar carbon savings from the calculations based on
+ the solar API and size of the array, instead of ML model
+
:param property_instance: Instance of the Property class, for the home associated to property_id
:param kwh_simulation_predictions: dictionary of predictions from the model apis
:param property_recommendations: dictionary of recommendations for the property
@@ -824,6 +831,12 @@ class Recommendations:
if rec["type"] == "solar_pv":
rec["kwh_savings"] = rec_impact["solar_kwh_savings"].values[0]
+
+ # Calculate carbon savings from this - emissions in kg and convert to tonnes
+ emissions_kg = rec["kwh_savings"] * assumptions.ELECTRICITY_CARBON_INTENSITY
+ emissions_tonnes = emissions_kg / 1000
+
+ rec["co2_equivalent_savings"] = emissions_tonnes
rec["energy_cost_savings"] = (
rec_impact["solar_kwh_savings"].values[0] * AnnualBillSavings.ELECTRICITY_PRICE_CAP
)
diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py
index 6778e886..b7e34406 100644
--- a/recommendations/RoofRecommendations.py
+++ b/recommendations/RoofRecommendations.py
@@ -138,6 +138,10 @@ class RoofRecommendations:
u_value = self.property.roof["thermal_transmittance"]
+ # If we have a flat roof but we don't have flat roof as a measure, we exit
+ if self.property.roof["is_flat"] and "flat_roof_insulation" not in measures:
+ return
+
# We check if the roof is already insulated and if so, we exit
# Building regulations part L recommend installing at least 270mm of insulation, however generally we
diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py
index 66c1d0c3..95f189d3 100644
--- a/recommendations/SolarPvRecommendations.py
+++ b/recommendations/SolarPvRecommendations.py
@@ -103,13 +103,22 @@ class SolarPvRecommendations:
for rank, recommendation_config in best_configurations.iterrows():
# If we dont have the panneled_roof_area in the recommendation_config we calculate it
if recommendation_config.get("panneled_roof_area", None):
- roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / total_roof_area * 100)
+ # We spread the coverage across the individual units
+ roof_coverage_percent = round(
+ ((recommendation_config["panneled_roof_area"] / total_roof_area) * 100) / n_units
+ )
else:
raise Exception("IMPLEMENT ME")
+
+ n_floors = (
+ self.property.number_of_storeys["number_of_storeys"] if
+ self.property.number_of_storeys["number_of_storeys"] is not None else 3
+ )
+
total_cost = self.costs.solar_pv(
array_cost=recommendation_config.get("cost", None),
n_panels=recommendation_config["n_panels"],
- n_floors=self.property.number_of_storeys["number_of_storeys"],
+ n_floors=n_floors,
needs_inverter=True,
)["total"] / n_units
diff --git a/recommendations/county_to_region.py b/recommendations/county_to_region.py
index f7d5193f..e84b5698 100644
--- a/recommendations/county_to_region.py
+++ b/recommendations/county_to_region.py
@@ -111,8 +111,11 @@ county_to_region_map = {
'Windsor and Maidenhead': 'South East England', 'Woking': 'South East England', 'Wokingham': 'South East England',
'Worthing': 'South East England', 'Wycombe': 'South East England',
'Bath and North East Somerset': 'South West England', 'Bournemouth': 'South West England',
- 'Bristol': 'South West England', 'Cheltenham': 'South West England', 'Christchurch': 'South West England',
- 'City of Bristol': 'South West England', 'Cornwall': 'South West England', 'Cotswold': 'South West England',
+ 'Bristol': 'South West England',
+ 'Cheltenham': 'South West England', 'Christchurch': 'South West England',
+ 'City of Bristol': 'South West England',
+ 'Bristol, City of': 'South West England',
+ 'Cornwall': 'South West England', 'Cotswold': 'South West England',
'Devon': 'South West England', 'Dorset': 'South West England', 'East Devon': 'South West England',
'East Dorset': 'South West England', 'Exeter': 'South West England', 'Forest of Dean': 'South West England',
'Gloucester': 'South West England', 'Gloucestershire': 'South West England',
diff --git a/recommendations/optimiser/optimiser_functions.py b/recommendations/optimiser/optimiser_functions.py
index c1123e3d..8c15673d 100644
--- a/recommendations/optimiser/optimiser_functions.py
+++ b/recommendations/optimiser/optimiser_functions.py
@@ -23,6 +23,10 @@ def prepare_input_measures(property_recommendations, goal):
# if the recommendation is a solar recommendation with a battery, we exclude it from the optimisation.
recs = [r for r in recs if ~r["has_battery"]]
+ recs_to_append = [rec for rec in recs if rec["energy_cost_savings"] >= 0]
+ if not recs_to_append:
+ continue
+
input_measures.append(
[
{
@@ -31,7 +35,7 @@ def prepare_input_measures(property_recommendations, goal):
"gain": rec[goal_key],
"type": rec["type"]
}
- for rec in recs
+ for rec in recs if rec["energy_cost_savings"] >= 0
]
)