Merge pull request #377 from Hestia-Homes/l-and-g

L and g
This commit is contained in:
KhalimCK 2025-01-28 15:14:08 +00:00 committed by GitHub
commit f79deb48ae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 1516 additions and 291 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Stonewater-wave-3" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Stonewater-wave-3" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

View file

@ -12,6 +12,8 @@ class Funding:
and flag any tenant specific requirements that need to be considered to the funding to be attained
"""
SCHEMES = ["eco4", "gbis", "whlg"]
ECO_SAP_SCORE_THREHOLDS = [
{'Band': 'High_A', 'From': 96.0, 'Up to': 100.0, 'Mid-point': 98.0},
{'Band': 'Low_A', 'From': 92.0, 'Up to': 96.0, 'Mid-point': 94.0},
@ -34,10 +36,12 @@ class Funding:
tenure: HousingType,
starting_epc,
starting_sap,
postcode,
floor_area,
council_tax_band,
property_recommendations,
project_scores_matrix,
whlg_eligible_postcodes,
gbis_abs_rate: int,
eco4_abs_rate: int,
):
@ -47,6 +51,10 @@ class Funding:
:param starting_epc: The current EPC rating of the property
:param starting_sap: The current SAP score for the property
:param floor_area: The total floor area of the property
:param council_tax_band: The council tax band of the property
:param property_recommendations: The recommendations for the property
:param project_scores_matrix: The matrix of project scores for ECO4
:param whlg_eligible_postcodes: The postcodes eligible for WHLG
:param gbis_abs_rate: The assumed £/abs achieved by the installer for GBIS
:param eco4_abs_rate: The assumed £/abs achieved by the installer for ECO4
"""
@ -58,6 +66,7 @@ class Funding:
self.tenure = tenure
self.starting_epc = starting_epc
self.starting_sap = starting_sap
self.postcode = postcode
self.starting_eco_band = self.sap_to_eco_band(self.starting_sap)
self.floor_area_segment = self.classify_floor_area(floor_area)
self.gbis_abs_rate = gbis_abs_rate
@ -75,6 +84,11 @@ class Funding:
(project_scores_matrix["Starting Band"] == self.starting_eco_band)
]
# The postcode column is already lower case
self.whlg_eligible_postcodes = whlg_eligible_postcodes[
whlg_eligible_postcodes["Postcode"] == self.postcode.lower()
]
# Store the final outputs
self.gbis_eligibiltiy = {}
self.eco4_eligibility = {}
@ -82,6 +96,8 @@ class Funding:
def output(
self,
scheme: str,
eligible: bool,
measure_types: List[str],
estimated_funding: float,
notify_tenant_benefits_requirements: bool,
@ -90,12 +106,18 @@ class Funding:
):
""""
"""
if scheme not in self.SCHEMES:
raise ValueError("Scheme not recognised")
return {
"scheme": scheme,
"eligible": eligible,
"measure_types": measure_types,
"estimated_funding": estimated_funding,
"notify_tenant_benefits_requirements": notify_tenant_benefits_requirements,
"notify_council_tax_band_requirements": notify_council_tax_band_requirements,
"notify_tenant_low_income_requirements": notify_tenant_low_income_requirements
"requires_benefits": notify_tenant_benefits_requirements,
"requires_council_tax_band": notify_council_tax_band_requirements,
"requires_low_income": notify_tenant_low_income_requirements
}
@staticmethod
@ -234,6 +256,8 @@ class Funding:
# If the council tax band is missing, we nofify the customer that this is a requirement that
# should be checked
return self.output(
scheme="gbis",
eligible=True,
measure_types=[recommended_measure["measure_type"]],
estimated_funding=recommended_measure["estimated_funding"],
notify_tenant_benefits_requirements=False,
@ -251,6 +275,8 @@ class Funding:
# We find the best measure for GBIS
recommended_measure = self.find_best_gbis_measure(measures=valid_measures)
return self.output(
scheme="gbis",
eligible=True,
measure_types=[recommended_measure["measure_type"]],
estimated_funding=recommended_measure["estimated_funding"],
notify_tenant_benefits_requirements=True,
@ -260,6 +286,8 @@ class Funding:
# Otherwise, no funding availability
return self.output(
scheme="gbis",
eligible=False,
measure_types=[],
estimated_funding=0,
notify_tenant_benefits_requirements=False,
@ -279,6 +307,23 @@ class Funding:
raise NotImplementedError("Implement social/oo")
def whlg(self):
if self.tenure == "Social":
# We can't do anything for social housing
self.whlg_eligibility = self.output(
scheme="whlg",
eligible=False,
measure_types=[],
estimated_funding=0,
notify_tenant_benefits_requirements=False,
notify_council_tax_band_requirements=False,
notify_tenant_low_income_requirements=False
)
return
if not self.whlg_eligible_postcodes.empty:
print("Eligible implement me!")
def eco4(self):
if self.tenure == "Private":
self.eco4_eligibiltiy = self.eco4_prs()
@ -292,4 +337,4 @@ class Funding:
self.gbis()
# self.eco4()
# self.whlg()
self.whlg()

View file

@ -133,9 +133,14 @@ class Property:
self.energy_cost_estimates = {}
self.energy_consumption_estimates = {}
# when storing the energy, we'll also
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"co2_emissions": epc_record.get("co2_emissions_current"),
"epc_co2_emissions": epc_record.get("co2_emissions_current"),
# These will be added in once we estimate the amount of emissions from appliances - using the carbon
# intensity of electricity
"appliances_co2_emissions": None,
"co2_emissions": None
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
@ -725,6 +730,15 @@ class Property:
"unadjusted": unadjusted_kwh_estimates
}
# Update carbon with appliances
self.energy["appliances_co2_emissions"] = (
(unadjusted_kwh_estimates["appliances"] * assumptions.ELECTRICITY_CARBON_INTENSITY) / 1000
)
# Re-calculate total CO2 emissions
self.energy["co2_emissions"] = float(np.round(
self.energy["epc_co2_emissions"] + self.energy["appliances_co2_emissions"], 2
))
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient

View file

@ -139,8 +139,8 @@ class SearchEpc:
}
NODATA = {
"status": 201,
"message": "No data",
"status": 204,
"message": "no data",
"error": None
}
@ -155,7 +155,7 @@ class SearchEpc:
uprn: [int, None] = None,
size=None,
property_type=None,
fast=False
fast=False,
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -248,14 +248,10 @@ class SearchEpc:
else:
return None
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params is None:
if self.uprn:
params = {"uprn": self.uprn}
else:
params = {"address": self.address1, "postcode": self.postcode}
def _get_epc(self, params, size):
"""
To be called by get_epc() - not for external usage
"""
url = os.path.join(self.client.domestic.host, "search")
if size:
@ -268,24 +264,20 @@ class SearchEpc:
if response:
self.data = response
return self.SUCCESS
return {
"response": response,
"msg": self.SUCCESS
}
if retry > 0:
logger.info("Failed previous attempt but retry successful")
# If we got nothing, final try
if not response:
return {
"status": 204,
"message": "no data",
"error": None
"response": response,
"msg": self.NODATA
}
return {
"status": 200,
"message": "success",
"error": None
}
except Exception as e:
if retry < self.max_retries - 1:
# If not the last retry, wait for 3 seconds before retrying
@ -293,11 +285,54 @@ class SearchEpc:
else:
# If it's the last retry, we continue
return {
"status": 500,
"message": "Could not retrieve EPC data",
"error": str(e)
"response": {},
"msg": {
"status": 500,
"message": "Could not retrieve EPC data",
"error": str(e)
}
}
def get_epc(self, params=None, size=None):
# Get the EPC data with retries
size = size if size is not None else self.size
if params:
output = self._get_epc(params=params, size=size)
if output["msg"]["status"] == 200:
self.data = output["response"]
return output["msg"]
uprn_params = {"uprn": self.uprn} if self.uprn else {}
address_params = {"address": self.address1, "postcode": self.postcode}
# We attempt the search with uprn params
data = {"rows": []}
if uprn_params:
api_response = self._get_epc(params=uprn_params, size=size)
if api_response["msg"]["status"] == 200:
data["rows"].extend(api_response["response"]["rows"])
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
# properties are sometimes listed under the wrong UPRN
api_response = self._get_epc(params=address_params, size=size)
if api_response["msg"]["status"] == 200:
# We update the data with the correct uprn
if self.uprn:
for x in api_response["response"]["rows"]:
x["uprn"] = self.uprn
data["rows"].extend(api_response["response"]["rows"])
# We no de-dupe on lmk-key to avoid duplicates
seen = set()
data["rows"] = [
row for row in data["rows"]
if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
]
return api_response["msg"]
def filter_rows(self, rows, property_type=None, address=None):
"""
This method should not be used when property_type and address are both not None
@ -693,9 +728,20 @@ class SearchEpc:
estimated_epc[variable] = str(int(estimated_epc[variable]))
# This is a string
estimated_epc["low-energy-fixed-light-count"] = str(estimated_epc["low-energy-fixed-light-count"])
estimated_epc["low-energy-fixed-light-count"] = (
str(estimated_epc["low-energy-fixed-light-count"]) if estimated_epc["low-energy-fixed-light-count"] else ""
)
# This is an int
estimated_epc["photo-supply"] = (
int(np.round(estimated_epc["photo-supply"])) if estimated_epc["photo-supply"] else estimated_epc[
"photo-supply"]
)
estimated_epc["postcode"] = self.postcode
if not self.uprn:
# Update self.uprn too
self.uprn = hash(self.address1 + self.postcode)
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address
# Indicate that this epc was estimated

View file

@ -51,6 +51,9 @@ class GoogleSolarApi:
MIN_UNIT_PANELS = 4 # Minimum number of panels we allow for a domestic building
MIN_BUILDING_PANELS = 10 # Minimum number of panels we allow for a block of flats
# Max area of a roof space we allow panels for
PERCENTAGE_OF_ROOF_LIMIT = 0.8
def __init__(self, api_key, max_retries=5):
"""
Initialize the GoogleSolarApi class with the provided API key and maximum retries.
@ -159,10 +162,11 @@ class GoogleSolarApi:
# Automatically exclude north-facing segments
self.exclude_north_facing_segments(property_instance=property_instance)
# If a property is semi-detached, it's possible for us to include segments from an attached unit
if (property_instance.data["built-form"] == "Semi-Detached") and (
property_instance.data["extension-count"] == 0
):
self.exclude_likely_duplicate_surfaces()
if property_instance is not None:
if (property_instance.data["built-form"] == "Semi-Detached") and (
property_instance.data["extension-count"] == 0
):
self.exclude_likely_duplicate_surfaces()
self.roof_area = self.insights_data["solarPotential"]["wholeRoofStats"]['areaMeters2']
self.floor_area = self.insights_data["solarPotential"]["wholeRoofStats"]['groundAreaMeters2']
@ -179,7 +183,9 @@ class GoogleSolarApi:
# We now start finding the solar panel configurations
self.optimise_solar_configuration(
energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance
energy_consumption=energy_consumption,
is_building=is_building,
property_instance=property_instance
)
# Finally, if we have a double property, we half the data we stored area
@ -295,7 +301,11 @@ class GoogleSolarApi:
continue
if cost_instance is None:
total_cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
total_cost = Costs.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
has_battery=False,
n_floors=3, # Assume the most amount of scaffolding
)["total"]
else:
total_cost = cost_instance.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
@ -491,6 +501,11 @@ class GoogleSolarApi:
panel_performance = panel_performance.drop(columns=["n_panels_halved"])
panel_performance = panel_performance[panel_performance["n_panels"] >= min_panels]
# Finally, we prevent pannelled roof area being above a limit
panel_performance = panel_performance[
panel_performance["panneled_roof_area"] <= self.roof_area * self.PERCENTAGE_OF_ROOF_LIMIT
]
self.panel_performance = panel_performance
def exclude_north_facing_segments(self, property_instance):

View file

@ -1,7 +1,7 @@
# Assumes that the average efficiency of an air source heat pump is 250%, taking the median of the 200-400% range,
# which is often quoted as a sensible efficiency range for air source heat pumps.
# We assume that the ASHP efficiency is 280%, which is the minimum that Cotswolds Energy Group achieves, as
# they target this
PESSIMISTIC_ASHP_EFFICIENCY = 200
AVERAGE_ASHP_EFFICIENCY = 250
AVERAGE_ASHP_EFFICIENCY = 280
# Conservative estimate of the proportion of electricity that will be consumed, whereas the rest will
# be exported. These are averages based on Google research. E.g
@ -14,6 +14,9 @@ RDSAP_AREA_PER_PANEL = 3.4
SOCIAL_TENURES = ["Rented (social)", "rental (social)"]
# Carbon intensity of electricity, as of 16th Jan 2025
ELECTRICITY_CARBON_INTENSITY = 0.232
DESCRIPTIONS_TO_FUEL_TYPES = {
"Air source heat pump, radiators, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100

View file

@ -121,7 +121,7 @@ def extract_portfolio_aggregation_data(
# We can now calculate multiple outputs based on default recommendations
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
pre_retrofit_co2 = p.data["co2-emissions-current"]
pre_retrofit_co2 = p.energy["co2_emissions"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
@ -339,6 +339,9 @@ def extract_property_request_data(
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else True
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
@ -366,23 +369,45 @@ def extract_property_request_data(
property_non_invasive_recommendations["recommendations"] = str(transformed)
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else True
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
else:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), None)
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
def get_eco_project_scores_matrix():
data = read_csv_from_s3(
def get_funding_data():
"""
This function retrieves the eco project scores matrix and the warm homes local grant funding data
:return:
"""
project_scores_matrix = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/ECO4 Full Project Scores Matrix.csv",
)
df = pd.DataFrame(data)
df.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
df["Cost Savings"] = df["Cost Savings"].astype(float)
return df
project_scores_matrix = pd.DataFrame(project_scores_matrix)
project_scores_matrix.columns = ['Floor Area Segment', 'Starting Band', 'Finishing Band', 'Cost Savings']
project_scores_matrix["Cost Savings"] = project_scores_matrix["Cost Savings"].astype(float)
whlg_eligible_postcodes = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/whlg eligible postcodes.csv",
)
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
return project_scores_matrix, whlg_eligible_postcodes
router = APIRouter(
@ -407,6 +432,7 @@ async def trigger_plan(body: PlanTriggerRequest):
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
@ -443,9 +469,12 @@ async def trigger_plan(body: PlanTriggerRequest):
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean,
epc_searcher.uprn,
energy_assessment
session=session,
portfolio_id=body.portfolio_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
uprn=epc_searcher.uprn,
energy_assessment=energy_assessment
)
if not is_new and not body.multi_plan:
continue
@ -526,7 +555,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
eco_project_scores_matrix = get_eco_project_scores_matrix()
eco_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
@ -670,9 +699,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Insert the predictions into the recommendations and run the optimiser
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
# possibility with heating system
# TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water
# cylinder jacket), we should add these to the recommendations as default
# possibility with heating system?
for p in input_properties:
if not recommendations.get(p.id):
@ -680,37 +707,42 @@ async def trigger_plan(body: PlanTriggerRequest):
input_measures = prepare_input_measures(recommendations[p.id], body.goal)
current_sap_points = int(p.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
if not body.optimise:
if body.goal != "Increasing EPC":
raise NotImplementedError("Only EPC optimisation is currently supported")
solution = []
for sub_list in input_measures:
# Select the entry with the highest gain, and if tied, choose the one with the lowest cost
best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost']))
solution.append(best_measure)
if not input_measures[0]:
# This means that we have no defaults
selected_recommendations = {}
else:
if body.budget:
optimiser = GainOptimiser(
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
)
current_sap_points = int(p.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
if not body.optimise:
if body.goal != "Increasing EPC":
raise NotImplementedError("Only EPC optimisation is currently supported")
solution = []
for sub_list in input_measures:
# Select the entry with the highest gain, and if tied, choose the one with the lowest cost
best_measure = max(sub_list, key=lambda x: (x['gain'], -x['cost']))
solution.append(best_measure)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=sap_gain
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
if body.budget:
optimiser = GainOptimiser(
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=sap_gain
)
selected_recommendations = {r["id"] for r in solution}
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# If wall insulation is selected, we also include mechanical ventilation as a best practice measure
if any(x in [r["type"] for r in solution] for x in [
@ -749,25 +781,53 @@ async def trigger_plan(body: PlanTriggerRequest):
]
recommendations[p.id] = final_recommendations
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
# of them
# TODO: We can probably do better and optimise at the building level - this is temp
logger.info("Adjusting solar PV recommendations for buildings")
building_ids = set([p.building_id for p in input_properties if p.building_id is not None])
for bid in building_ids:
# We check if any of them have solar PV
building = [p for p in input_properties if p.building_id == bid]
has_solar = False
for unit in building:
# Get default recommendations
has_solar = len([r for r in recommendations[unit.id] if r["default"] and r["type"] == "solar_pv"]) > 0
if has_solar:
break
if has_solar:
# We adjust the units within the building
for unit in building:
for rec in recommendations[unit.id]:
if rec["type"] == "solar_pv":
# This is straightforward, we just set the default to True, since when we're at a building
# level, we only allow 1 solar PV option for each unit. If we change this, this logic will
# need to be updated
rec["default"] = True
# ~~~~~~~~~~~~~~~~
# Funding
# ~~~~~~~~~~~~~~~~
# for p in input_properties:
# funding_calulator = Funding(
# tenure=body.housing_type,
# starting_epc=p.data["current-energy-rating"],
# starting_sap=int(p.data["current-energy-efficiency"]),
# floor_area=p.floor_area,
# council_tax_band=None, # This is seemingly always None at the moment
# property_recommendations=recommendations[p.id],
# project_scores_matrix=eco_project_scores_matrix,
# gbis_abs_rate=20,
# eco4_abs_rate=20,
# )
# funding_calulator.check_eligibiltiy()
# # Insert finding
# p.insert_funding(funding_calulator)
for p in input_properties:
funding_calulator = Funding(
tenure=body.housing_type,
starting_epc=p.data["current-energy-rating"],
starting_sap=int(p.data["current-energy-efficiency"]),
postcode=p.postcode,
floor_area=p.floor_area,
council_tax_band=None, # This is seemingly always None at the moment
property_recommendations=recommendations[p.id],
project_scores_matrix=eco_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
gbis_abs_rate=20,
eco4_abs_rate=15,
)
funding_calulator.check_eligibiltiy()
# Insert finding
p.insert_funding(funding_calulator)
logger.info("Uploading recommendations to the database")
# If we have any work to do, we create a new scenario

View file

@ -28,8 +28,8 @@ class AnnualBillSavings:
# Latest price cap figures from Ofgem are for April 2024
# https://www.ofgem.gov.uk/energy-price-cap
ELECTRICITY_PRICE_CAP = 0.2236
GAS_PRICE_CAP = 0.0548
ELECTRICITY_PRICE_CAP = 0.2486
GAS_PRICE_CAP = 0.0634
# This is the most recent export payment figure, at 9.28p/kWh
# Smart export guarantee rates can be found here:
# https://www.sunsave.energy/solar-panels-advice/exporting-to-the-grid/best-seg-rates
@ -39,8 +39,8 @@ class AnnualBillSavings:
PRICE_FACTOR = 0.09549999999999999
# Daily standard charge, based on average across England, Scotland and Wales, and includes VAT
DAILY_STANDARD_CHARGE_GAS = 0.3143
DAILY_STANDARD_CHARGE_ELECTRICITY = 0.601
DAILY_STANDARD_CHARGE_GAS = 0.3165
DAILY_STANDARD_CHARGE_ELECTRICITY = 0.6097
# Based on https://www.nottenergy.com/advice-and-tools/project-energy-cost-comparison
# For July 2024. These quotes are based on the east midlands region, so we

View file

@ -0,0 +1,50 @@
import pytest
import os
from backend.SearchEpc import SearchEpc # Replace with your actual module name
from dotenv import load_dotenv
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
class TestSearchEpcIntegration:
@pytest.mark.parametrize(
"address, postcode, uprn, skip_os, expected_partial_address",
[
# Test case 1: Valid address and postcode, skipping OS
# In this case, the property is an individual flat but the uprn associated to the
# EPC is for the building as a whole, possibly because there was a conversion of sorts
("Garden Flat, 48 Bedminster Parade", "BS3 4HS", 308249, True,
"260907a5431fa073d193cc6bbec51fbf1ba9a61845ab2503f85aa19ce3ed6afd", 1),
# Test case 2: Another valid address and postcode
# In this case, the newest EPC, does not have a uprn associated to it. If we did a search by
# uprn, we would get an old EPC
("Flat 8, Hainton House", "DN32 9AQ", 10090082018, True,
"bd1149a20a73397184f07a9955f872424826e70f4870c058d71be887766ee1f8", 3),
],
)
def test_find_property(self, address, postcode, uprn, skip_os, lmk_key, n_old_epcs):
"""
Integration test for `find_property`, making actual API calls.
"""
# Provide your actual API keys or tokens here
os_api_key = ""
# Initialize the SearchEpc instance
epc_searcher = SearchEpc(
address1=address,
postcode=postcode,
uprn=uprn,
auth_token=EPC_AUTH_TOKEN,
os_api_key=os_api_key,
)
# Execute the method
epc_searcher.find_property(skip_os=skip_os)
# We check that we have the correct epc
assert epc_searcher.newest_epc["lmk-key"] == lmk_key
assert epc_searcher.newest_epc["uprn"] == uprn
assert len(epc_searcher.older_epcs) == n_old_epcs

View file

@ -83,8 +83,11 @@ def api_call_decorator(func):
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
n_calls = 0
while url:
logger.info("Making call for page: " + str(n_calls + 1))
n_calls += 1
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
@ -93,6 +96,7 @@ def api_call_decorator(func):
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
logger.info(f"Next page URL: {url}")
else:
response_data = response_json # Capture the full response for consistency
break
@ -270,6 +274,48 @@ class SharePointClient:
return file_content
def download_sharepoint_folder(self, drive_id, folder_path, download_dir, excluded_file_types=None):
"""
Downloads all files in a SharePoint folder to the specified local directory.
:param drive_id: The ID of the SharePoint drive.
:param folder_path: The path of the folder in SharePoint.
:param download_dir: The local directory to save the downloaded files.
:param excluded_file_types: A list of file types to exclude from download (default is None).
"""
excluded_file_types = [] if excluded_file_types is None else excluded_file_types
# Ensure the download directory exists
os.makedirs(download_dir, exist_ok=True)
# List folder contents
folder_contents = self.list_folder_contents(drive_id, folder_path)
files = folder_contents.get('value', [])
for item in files:
if item.get('folder'): # Check if it's a folder
# Recursively handle subfolders
subfolder_path = f"{folder_path}/{item['name']}"
subfolder_dir = os.path.join(download_dir, item['name'])
self.download_sharepoint_folder(drive_id, subfolder_path, subfolder_dir)
else:
# It's a file, download it
file_name = item['name']
if file_name.split(".")[-1] in excluded_file_types:
continue
download_url = item['@microsoft.graph.downloadUrl']
logger.info(f"Downloading file: {file_name}")
file_content = self.download_sharepoint_file(download_url)
# Save the file locally
file_path = os.path.join(download_dir, file_name)
with open(file_path, 'wb') as f:
f.write(file_content.read())
logger.info(f"File saved to: {file_path}")
def app():
# Customers for WC 18/11/2024

View file

@ -21,10 +21,10 @@ def app():
"property_type": "House", "built-form": "Semi-Detached"
},
{
"address": "21 High Street", "postcode": "CB23 8AB", "uprn": 100090136026
"address": "21 High Street", "postcode": "CB23 8AB", "uprn": 100090144815
},
{
"address": "22 High Street", "postcode": "CB23 8AB", "uprn": 100090136027
"address": "22 High Street", "postcode": "CB23 8AB", "uprn": 100090144816
},
{
"address": "5 Bunkers Hill", "postcode": "CB3 0LY", "uprn": 10008078615
@ -52,8 +52,8 @@ def app():
valuations_data = [
{'uprn': 100090136018, "valuation": 586_000},
{'uprn': 100090136026, "valuation": 551_000},
{'uprn': 100090136027, "valuation": 844_000},
{'uprn': 100090144815, "valuation": 446_000},
{'uprn': 100090144816, "valuation": 448_000},
{'uprn': 10008078615, "valuation": 763_000},
{'uprn': 10008078616, "valuation": 616_000},
{'uprn': 10008078617, "valuation": 593_000},

View file

@ -0,0 +1,166 @@
"""
This script prepares the asset list for modelling the properties from the L&Q dataset, for their January IC
"""
import pandas as pd
import numpy as np
from etl.route_march_data_pull.app import get_data
from utils.s3 import save_csv_to_s3
PORTFOLIO_ID = 124
USER_ID = 8
def app():
asset_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon information for Domna/Basildon MDS v1.4 "
"(1).xlsx",
sheet_name="Basildon",
header=5
)
asset_data = asset_data.head(-3)
asset_data["address1"] = np.where(
pd.isnull(asset_data["Address 1"]),
asset_data["Address 2"],
asset_data["Address 1"]
)
asset_data["full_address"] = np.where(
pd.isnull(asset_data["Address 1"]),
asset_data["Address 2"] + ", " + asset_data["Address 3"],
asset_data["Address 1"] + ", " + asset_data["Address 2"] + ", " + asset_data["Address 3"],
)
asset_list = asset_data[["address1", "PostCode", "full_address", "Bedrooms"]]
asset_list = asset_list.reset_index(drop=True)
asset_list["row_id"] = asset_list.index
# L&G's focus:
# Measures: loft and cavity insulation, replacement thermally efficient windows, PV cells, AS heat pumps.
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
fulladdress_column="full_address",
address1_column="address1",
postcode_column="PostCode",
manual_uprn_map={}
)
missed = asset_list[
asset_list["row_id"].isin(no_epc)
]
# We merge on the property types, where we have them
missed = missed.merge(
asset_data[["address1", "PostCode", "Property Type"]],
how="left",
on=["address1", "PostCode"]
)
# Remap Block: Residential to Flat
missed["Property Type"] = np.where(
missed["Property Type"] == "Block: Residential",
"Flat",
missed["Property Type"]
)
# We create the asset list - we have some properties that genuninely never had an EPC
epc_df = pd.DataFrame(epc_data)
fetched_asset_list = epc_df[["address1", "postcode", "uprn", "row_id"]]
fetched_asset_list = fetched_asset_list.merge(
asset_list[["row_id", "Bedrooms"]],
how="left",
on=["row_id"]
)
missed = missed.rename(columns={"PostCode": "postcode"}).drop(columns=["row_id"])
# missed.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/missed_epcs.csv")
missed_uprns = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/missed_epcs_uprn.csv",
)
missed = missed.merge(
missed_uprns[["address1", "postcode", "UPRN"]].rename(
columns={"UPRN": "uprn"},
),
how="left",
on=["address1", "postcode"]
)
fetched_asset_list = fetched_asset_list.drop(columns=["row_id"])
# We concatename them
final_asset_list = pd.concat(
[fetched_asset_list, missed[["address1", "postcode", "Property Type", "Bedrooms", "uprn"]]]
)
final_asset_list = final_asset_list.rename(
columns={
"address1": "address",
"Property Type": "property_type",
"Bedrooms": "n_bedrooms"
}
)
# Finally, we merge on the numeber of bedrooms
# Extract the non-invasive recommendations:
non_invasive_recommendations = []
for x in epc_data:
non_invasive_recommendations.append(
{
"uprn": x["uprn"],
"recommendations": x["find_my_epc_data"]["recommendations"]
}
)
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(final_asset_list),
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
# Store the valuations data in s3
# valuations_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuations.csv"
# save_csv_to_s3(
# dataframe=pd.DataFrame(valuations_data),
# bucket_name="retrofit-plan-inputs-dev",
# file_name=valuations_filename
# )
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Retrofit Packages",
"multi_plan": True,
"budget": None,
"inclusions": [
"cavity_wall_insulation",
"loft_insulation",
"windows",
"solar_pv",
"air_source_heat_pump"
]
}
print(body)

View file

@ -0,0 +1,243 @@
import pandas as pd
from backend.app.utils import sap_to_epc
data = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/property_202501170837.csv"
)
data["year_built"].value_counts()
# 1950-1966 26
# 1967-1975 37
# 1976-1982 37
# 1983-1990 33
# 1991-1995 139
# 1996-2002 42
# 2003-2006 50
data["full_property_type"] = data["property_type"] + ": " + data["built_form"]
houses = data[data["property_type"].isin(["House", "Bungalow"])]
houses["built_form"].value_counts()
data["property_type"].value_counts()
data["full_property_type"].value_counts()
# House: Mid-Terrace 136
# House: End-Terrace 83
# House: Semi-Detached 55
# Flat: Semi-Detached 24
# Flat: End-Terrace 19
# House: Detached 10
# Flat: Mid-Terrace 9
# Maisonette: Mid-Terrace 9
# Maisonette: Semi-Detached 8
# Maisonette: End-Terrace 6
# Flat: Detached 4
# Bungalow: Detached 1
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/basildon EPC Data.csv"
)
# Classify floor area in <73m2, 73-98, 99-200, 200+
epc_data["floor_area_bracket"] = epc_data["total_floor_area"].apply(
lambda x: "<73" if x < 73 else "73-98" if x < 99 else "99-200" if x < 200 else "200+")
# 73-98 185
# <73 156
# 99-200 23
epc_data["wall_type"] = epc_data["walls"].str.split(",").str[0]
epc_data["wall_type"].value_counts()
# Cavity wall 343
# Timber frame 15
# System built 6
# we pull some additional data
# We want:
# 1) The list of properties included in the portfolio, with uprn
# 2) The recommendations against each property with costs, and whether or not the recommendation was defaulted
# 3) The properties without recommendations and why
from tqdm import tqdm
import pandas as pd
import numpy as np
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
session.begin()
# Get properties and their details for a specific portfolio
properties_query = session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == portfolio_id # Filter by portfolio ID
).all()
# Transform properties data to include all fields dynamically
properties_data = [
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
PropertyDetailsEpcModel.__table__.columns}}
for prop in properties_query
]
# Get property IDs from fetched properties
# Get plans linked to the fetched properties
plans_query = session.query(Plan).filter(Plan.scenario_id.in_(scenario_ids)).all()
# Transform plans data to include all fields dynamically
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
# Extract plan IDs for filtering recommendations through PlanRecommendations
plan_ids = [plan['id'] for plan in plans_data]
# Get recommendations through PlanRecommendations for those plans and that are default
recommendations_query = session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default == True # Filtering for default recommendations
).all()
# Transform recommendations data to include all fields dynamically and include scenario_id
recommendations_data = [
{**{col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec,
col.name) for
col in Recommendation.__table__.columns},
"Scenario ID": rec.scenario_id}
for rec in recommendations_query
]
session.close()
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(portfolio_id=124, scenario_ids=[199])
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
recommended_measures_df = recommendations_df[
["property_id", "measure_type", "estimated_cost", "default"]
]
recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]]
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
post_install_sap = recommendations_df[["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index()
recommendations_measures_pivot = recommended_measures_df.pivot(
index='property_id',
columns='measure_type',
values='estimated_cost'
)
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
recommendations_measures_pivot = recommendations_measures_pivot.rename(
columns={
"air_source_heat_pump": "Cost: Air Source Heat Pump",
"cavity_wall_insulation": "Cost: Cavity Wall Insulation",
"double_glazing": "Cost: Double Glazing",
"loft_insulation": "Cost: Loft Insulation",
"mechanical_ventilation": "Cost: Ventilation",
"solar_pv": "Cost: Solar PV"
}
)
recommendations_measures_pivot = recommendations_measures_pivot.fillna(0)
recommendations_measures_pivot["Recommendation: Air Source Heat Pump"] = (
recommendations_measures_pivot["Cost: Air Source Heat Pump"] > 0
)
recommendations_measures_pivot["Recommendation: Cavity Wall Insulation"] = (
recommendations_measures_pivot["Cost: Cavity Wall Insulation"] > 0
)
recommendations_measures_pivot["Recommendation: Double Glazing"] = (
recommendations_measures_pivot["Cost: Double Glazing"] > 0
)
recommendations_measures_pivot["Recommendation: Loft Insulation"] = (
recommendations_measures_pivot["Cost: Loft Insulation"] > 0
)
recommendations_measures_pivot["Recommendation: Ventilation"] = (
recommendations_measures_pivot["Cost: Ventilation"] > 0
)
recommendations_measures_pivot["Recommendation: Solar PV"] = (
recommendations_measures_pivot["Cost: Solar PV"] > 0
)
df = properties_df[
[
"property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows",
"current_epc_rating",
"current_sap_points", "total_floor_area", "number_of_rooms",
]
].merge(
recommendations_measures_pivot, how="left", on="property_id"
).merge(
post_install_sap, how="left", on="property_id"
)
df = df.drop(columns=["property_id"])
df["sap_points"] = df["sap_points"].fillna(0)
df = df.rename(
columns={
"uprn": "UPRN",
"address": "Address",
"postcode": "Postcode",
"walls": "Walls",
"roof": "Roof",
"heating": "Heating",
"windows": "Windows",
"current_epc_rating": "Current EPC Rating",
"current_sap_points": "Current SAP Points",
"total_floor_area": "Total Floor Area",
"number_of_rooms": "Number of Habitable Rooms",
"floor_height": "Floor Height",
}
)
df["Has Recommendations"] = ~pd.isnull(df["Cost: Air Source Heat Pump"])
# We fill missings:
for col in [
"Recommendation: Air Source Heat Pump", "Recommendation: Cavity Wall Insulation",
"Recommendation: Double Glazing", "Recommendation: Loft Insulation", "Recommendation: Ventilation",
"Recommendation: Solar PV"
]:
df[col] = df[col].fillna(False)
for col in [
"Cost: Air Source Heat Pump", "Cost: Cavity Wall Insulation",
"Cost: Double Glazing", "Cost: Loft Insulation", "Cost: Ventilation",
"Cost: Solar PV"
]:
df[col] = df[col].fillna(0)
# Calculate post SAP
df["Predicted Post Works SAP"] = df["Current SAP Points"] + df["sap_points"]
df["Predicted Post Works SAP"] = df["Predicted Post Works SAP"].round()
df["Predicted Post Works EPC"] = df["Predicted Post Works SAP"].apply(lambda x: sap_to_epc(x))
df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon Data Export - 2.csv", index=False)

View file

@ -1,9 +1,15 @@
import os
import pandas as pd
from dotenv import load_dotenv
from utils.s3 import save_csv_to_s3
from etl.find_my_epc.AssetListEpcData import AssetListEpcData
PORTFOLIO_ID = 120
PORTFOLIO_ID = 126
USER_ID = 8
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
"""
@ -13,11 +19,23 @@ def app():
asset_list = [
{
"uprn": 100030334057,
"address": "5, Lynton Street",
"postcode": "DE22 3RW"
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308249,
},
{
"address": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308251
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308250,
}
]
asset_list = pd.DataFrame(asset_list)
@ -29,40 +47,37 @@ def app():
file_name=filename
)
non_invasive_recommendations = [
{
"uprn": 100030334057,
"recommendations": [
{
"type": "internal_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "external_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "suspended_floor_insulation",
"sap_points": 2,
"survey": True
}
]
}
]
# Pull the non-invasive recommendations automatically
asset_list_epc_client = AssetListEpcData(
asset_list=asset_list,
epc_auth_token=EPC_AUTH_TOKEN
)
asset_list_epc_client.get_data()
asset_list_epc_client.get_non_invasive_recommendations()
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
dataframe=pd.DataFrame(asset_list_epc_client.non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
valuation_data = [
{
"uprn": 100030334057,
"value": 133_000
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
}
]
# Store valuation data to s3

View file

@ -2905,5 +2905,38 @@ def identify_incorrect_packages():
os.path.join(CUSTOMER_FOLDER_PATH, "Units with assigned packages - with flags.csv"), index=False
)
def revised_model():
"""
This function implements the revised model for Stonewater, where we are looking at new priority postcodes
This work was undertaken in January 2021.
"""
# 1) Create the new list of properties
new_priority_postcodes = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Jan 2025 Project/Updated 2025 to 2030 "
"priority list.xlsx"
)
original_archetypes = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
"- Archetyped V3.1.xlsx",
header=4
)
original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])]
original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"]
original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int)
original_archetypes = original_archetypes[
["Address ID", "Archetype ID", ""]
]
# Check if we have all of the addresses
missed = original_archetypes[
~original_archetypes["Address ID"].isin(new_priority_postcodes["Address ID"].values)
]["Archetype ID"].unique()
assert
# if __name__ == "__main__":
# main()

View file

@ -0,0 +1,137 @@
import os
import shutil
from tqdm import tqdm
def delete_large_files():
"""
This function deletes photos, designs and other files which we don't need
:return:
"""
folder_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys"
# List the contents of this folder since in each sub-folder we have the property folders
contents = os.listdir(folder_path)
for subfolder in contents:
if not os.path.isdir(os.path.join(folder_path, subfolder)):
continue
subfolder_path = os.path.join(folder_path, subfolder)
# List the contents
property_folders = os.listdir(subfolder_path)
for property in tqdm(property_folders):
# Check if it's a directory
if not os.path.isdir(os.path.join(subfolder_path, property)):
continue
property_path = os.path.join(subfolder_path, property)
property_contents = os.listdir(property_path)
# We delete the contents of the following folders:
# '1. RA Property Pics'
# '4. Air Tightness Tests'
# '5. RD Design Info'
for folder_to_delete in ["1. RA Property Pics", "4. Air Tightness Tests", "5. RD Design Info",
"1. RA Property PIcs", "Post EPC Photos", "4. RD Design Info",
"5. Installer Info", "6. Trustmark lodgement", "7.Post Install Inspection Photos",
"6. Trustmark Lodgement", "7. Post Inspection Photos"]:
if folder_to_delete not in property_contents:
continue
folder_to_delete_path = os.path.join(property_path, folder_to_delete)
if os.path.isdir(folder_to_delete_path):
# Delete the folder, even if it's not empty
shutil.rmtree(folder_to_delete_path)
# We now check the '2. RA Coordinator Info' folder for any .MOV files and delete them
if "2. RA Coordinator Info" not in property_contents:
coordinator_folder = "1. RA Coordinator Info"
else:
coordinator_folder = "2. RA Coordinator Info"
coordinator_info_path = os.path.join(property_path, coordinator_folder)
coordinator_info_contents = os.listdir(coordinator_info_path)
# Look for .MOV files and .jpg files
for file in coordinator_info_contents:
if file.endswith(".MOV"):
os.remove(os.path.join(coordinator_info_path, file))
if file.endswith(".jpg"):
os.remove(os.path.join(coordinator_info_path, file))
if "Property Pics" in coordinator_info_contents:
# Delete folder and contents
shutil.rmtree(os.path.join(coordinator_info_path, "Property Pics"))
def download_data_from_sharepoint():
# Given a sharepoint location, this function will download the retrofit assessment folders from the locations
# specified in the sharepoint location
from etl.access_reporting.app import SharePointClient
sharepoint_client = SharePointClient(
tenant_id="10d5af8b-2cfd-4882-9ccd-b96e4812dacf",
client_id="6832a4c5-fb8c-4082-a746-4f51e1020f0d",
client_secret="xpC8Q~Frww48SM1V-D8lGy5iOY7P_cJ7FF3jgarQ",
site_id="bc925a9a-ad0b-4de9-9a3c-e61014cc7489"
)
# Retrieve the data from Sharepoint and write to local machine
contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders"
)
len(contents["value"])
folders_to_pull = [
folder for folder in contents["value"] if folder["name"] in ["3. Wiltshire", "4. Bournemouth", "5. Coventry"]
]
for folder_to_pull in folders_to_pull:
# Get the contents
folder_contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders" + "/" +
folder_to_pull["name"],
page_size=100
)
property_folders = [f for f in folder_contents["value"]]
for property_folder in property_folders:
# We go into each property folder and get the contents
property_folder_contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders" + "/" +
folder_to_pull["name"] + "/" + property_folder["name"]
)
# We look for the retrofit assessment folder:
property_sub_folders = [
f for f in property_folder_contents["value"] if "ra coordinator info" in f["name"].lower()
]
if not property_sub_folders:
continue
# if we have this, we download the folder and store it on my laptop!
property_sub_folder = property_sub_folders[0]
property_folder_path = os.path.join(
"Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
download_dir = os.path.join(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
# We download the folder
sharepoint_client.download_sharepoint_folder(
drive_id=sharepoint_client.document_drive["id"],
folder_path=property_folder_path,
download_dir=download_dir,
excluded_file_types=["MOV"]
)

View file

@ -7,6 +7,8 @@ from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from utils.s3 import read_from_s3, read_pickle_from_s3
import msoffcrypto
from io import BytesIO
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
@ -64,6 +66,28 @@ def app():
This code creates a list of cavity properties, for review
"""
# Read in the password protected master
# TODO: This file should be deleted!
# Path to the password-protected Excel file
file_path = ("/Users/khalimconn-kowlessar/Downloads/STONEWATER MASTER SHEET - UPDATED 20.5.24 - K- PASSWORD "
"PROTECTED.xlsx")
password = "STONE123" # Replace with the actual password
# Open the file and decrypt it
with open(file_path, "rb") as f:
decrypted_file = BytesIO()
office_file = msoffcrypto.OfficeFile(f)
office_file.load_key(password=password)
office_file.decrypt(decrypted_file)
# Read the decrypted file into a DataFrame
eco_rolling_master = pd.read_excel(decrypted_file, sheet_name="Sheet1", engine="openpyxl")
eco_rolling_master = eco_rolling_master[
~eco_rolling_master['INSTALL/CANCELLATION DATE'].str.contains("CANCELLED")
]
archetyped_properties = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 - "
"Archetyped V3.1.xlsx",
@ -116,13 +140,16 @@ def app():
features_to_merge = features[
[
"Address ID", "Age", "Property Type", "Walls", "Roofs", "Glazing", "Heating", "Main Fuel", "Hot Water",
"Address ID", "Organisation Reference", "Age", "Property Type", "Walls", "Roofs", "Glazing", "Heating",
"Main Fuel",
"Hot Water",
"Renewables", "Total Floor Area"
]
]
stonewater_cavity_properties = archetyped_properties[
["Name", "Postcode", "Osm. ID", "Address ID", "UPRN", "UDPRN", "Archetype ID", "House no", "Street name",
["Name", "Postcode", "Osm. ID", "Org. ref.", "Address ID", "UPRN", "UDPRN", "Archetype ID", "House no",
"Street name",
"Address line 2", "City/Town", "Is Cavity Property", "Survey shows CWI needed for Archetype"]
].merge(
features_to_merge, how="left", on="Address ID"
@ -166,77 +193,137 @@ def app():
stonewater_cavity_properties["Reason Included"]
)
# We flag units that were installed under ECO3
numeric_ids = eco_rolling_master[eco_rolling_master["STONEWATER UPRN"] != "NOT ON ASSET LIST"]
numeric_ids = numeric_ids[~pd.isnull(numeric_ids["STONEWATER UPRN"])]
numeric_ids["STONEWATER UPRN"] = numeric_ids["STONEWATER UPRN"].astype(int)
stonewater_cavity_properties["Installed under ECO3"] = stonewater_cavity_properties["Org. ref."].isin(
numeric_ids['STONEWATER UPRN'].values
)
# Which postcodes were installed under ECO3
priority_list_eco3 = stonewater_cavity_properties[
stonewater_cavity_properties["Installed under ECO3"]
]["Postcode"].unique()
# These are properties that were not installed under ECO3, that have the same postcodes as properties
# installed under ECO3
# These are 66 properties we might want to start with as an immediate priority
stonewater_cavity_properties["Same Postcode as Installed under ECO3"] = (
~stonewater_cavity_properties["Installed under ECO3"] & (
stonewater_cavity_properties["Postcode"].isin(priority_list_eco3)
)
)
# We get the EPC data
epc_data = json.loads(
read_from_s3(
bucket_name="retrofit-data-dev",
s3_file_name="customers/Stonewater/clustering/epc_data.json"
)
)
epc_data = pd.DataFrame(epc_data)
epc_data["uprn"] = np.where(
epc_data["internal_id"] == 1091,
83143766,
epc_data["uprn"]
)
epc_data_batch_2 = read_pickle_from_s3(
s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
bucket_name="retrofit-data-dev"
)
epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
complete_epcs = pd.concat([epc_data, epc_data_batch_2])
epcs_to_merge = complete_epcs[
[
"uprn",
"address",
"postcode",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
"energy-consumption-current"
]
].rename(
columns={
"address": "Address",
"postcode": "Postcode",
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
}
)
# We de-dupe, taking the newest on the date the EPC was lod
epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
# epc_data = json.loads(
# read_from_s3(
# bucket_name="retrofit-data-dev",
# s3_file_name="customers/Stonewater/clustering/epc_data.json"
# )
# )
# epc_data = pd.DataFrame(epc_data)
#
# epc_data["uprn"] = np.where(
# epc_data["internal_id"] == 1091,
# 83143766,
# epc_data["uprn"]
# )
#
# epc_data_batch_2 = read_pickle_from_s3(
# s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
# bucket_name="retrofit-data-dev"
# )
# epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
#
# complete_epcs = pd.concat([epc_data, epc_data_batch_2])
#
# epcs_to_merge = complete_epcs[
# [
# "uprn",
# "address",
# "postcode",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "address": "Address",
# "postcode": "Postcode",
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# )
# # We de-dupe, taking the newest on the date the EPC was lod
# epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
# epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
# epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
stonewater_cavity_properties["UPRN"] = stonewater_cavity_properties["UPRN"].astype("Int64").astype(str)
stonewater_cavity_properties["Reason Included"].value_counts()
# Find the postcodes where an Osmosis survey revealed a need for CWI
postcodes_found_needing_cwi = stonewater_cavity_properties[
stonewater_cavity_properties["Reason Included"].isin(
[
"Survey revealed potential need for CWI or extract and re-fill",
"Surveyed revealed potential need for CWI or extract and re-fill and is an as built cavity property",
"Survey showed this property needs CWI",
"Survey showed this property could need extract and re-fill"
]
)
]["Postcode"].unique()
stonewater_cavity_properties["Suspected Needs CWI - not surveyed"] = (
(
stonewater_cavity_properties[
"Postcode"].isin(
postcodes_found_needing_cwi)
) & (
~stonewater_cavity_properties[
"Reason Included"].isin(
[
"Survey revealed potential need "
"for CWI or extract and re-fill",
"Surveyed revealed potential "
"need for CWI or extract and "
"re-fill and is an as built "
"cavity property",
"Survey showed this property "
"needs CWI",
"Survey showed this property "
"could need extract and re-fill"
]
)
)
)
# Merge the EPCs on, with the data we need
stonewater_cavity_properties = stonewater_cavity_properties.rename(
@ -252,12 +339,12 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
).merge(
epcs_to_merge,
how="left",
left_on="UPRN",
right_on="uprn"
)
) # .merge(
# epcs_to_merge,
# how="left",
# left_on="UPRN",
# right_on="uprn"
# )
# We now flag the additional properties in the as built list
@ -288,8 +375,56 @@ def app():
additional_properties = additional_properties.merge(house_numbers, how="left", on="Address ID")
additional_properties["row_id"] = additional_properties["Address ID"].copy()
# Flag any units in this list that were installed under ECO3
additional_properties["Installed under ECO3"] = additional_properties["Organisation Reference"].isin(
numeric_ids['STONEWATER UPRN'].values
)
# Additional list ECO3
additional_list_eco3 = additional_properties[additional_properties["Installed under ECO3"]]["Postcode"].unique()
# These are properties that were not installed under ECO3, that have the same postcodes as properties
# installed under ECO3
# These are 297 properties we might want to start with as an immediate priority
additional_properties["Same Postcode as Installed under ECO3"] = (
~additional_properties["Installed under ECO3"] & (
additional_properties["Postcode"].isin(additional_list_eco3)
)
)
# We do some additional manual checks, for ECO3 properties that were installed that didn't get matched to either
# dataaset
numeric_ids["In asset list"] = numeric_ids["STONEWATER UPRN"].isin(
stonewater_cavity_properties['Org. ref.'].astype(int).values
)
numeric_ids["In asset list"] = numeric_ids["In asset list"] | (
numeric_ids["STONEWATER UPRN"].isin(
additional_properties['Organisation Reference'].astype(int).values
)
)
# eco3_installs_not_in_asset_list = numeric_ids[~numeric_ids["In asset list"]]
# # We now take samples of properties randomly and manually check the ID against the asset list
# print(eco3_installs_not_in_asset_list.sample(1)[["STONEWATER UPRN", "Post Code", "NO ", "Street / Block Name", ]])
# # Checked STONEWATER UPRN
# # 9862, BH15 1NR, 33, THE QUAY FOYER [x]
# # 12785, S01 66PN, 57, SEACOLE GARDENS [x]
# # 26071, MK42 0TE, 51, De Havilland Avenue, Shortstown [x]
# # 18213, HR6 9UW, 20 Ford Street [x]
# # 24344, LU4 9FF, 6 SEAL CLOSE [x]
# # 31222, SN14 0QZ, 7 HARDBROOK COURT [x]
# # 9343, SP4 7XL, 10 OAK PLACE [x]
# # 34730, LU5 5TN, 4 TUDOR DRIVE [x]
# # 7021, BN27 2BZ, 32 BUTTS FIELD []
#
# stonewater_cavity_properties[stonewater_cavity_properties['Org. ref.'] == 7021]
# stonewater_cavity_properties[stonewater_cavity_properties['Postcode'] == "BN27 2BZ"]["Name"]
#
# additional_properties[additional_properties['Organisation Reference'] == 7021]
# additional_properties[additional_properties['Postcode'] == "BN27 2BZ"][["Address"]]
# Pull the EPCs for these properties
additional_properties_epcs, errors = get_data(additional_properties)
# additional_properties_epcs, errors = get_data(additional_properties)
# Save this data as a pickle
# import pickle
@ -297,12 +432,20 @@ def app():
# "wb") as f:
# pickle.dump(additional_properties_epcs, f)
additional_properties["Suspected Needs CWI - not surveyed"] = (
(
additional_properties["Postcode"].isin(postcodes_found_needing_cwi)
)
)
additional_properties["Same Postcode as Installed under ECO3"].value_counts()
# We drop Full Address
additional_properties = additional_properties.drop(columns=["Full Address"])
additional_properties2 = additional_properties[[
"row_id", "Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
"Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area",
"Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
"Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area", 'Installed under ECO3',
'Same Postcode as Installed under ECO3'
]].rename(
columns={
"SAP": "Parity - Predicted SAP",
@ -318,56 +461,58 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
).merge(
pd.DataFrame(additional_properties_epcs)[
[
"row_id",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
"energy-consumption-current"
]
].rename(
columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
}
),
how="left",
on="row_id"
)
) # .merge(
# pd.DataFrame(additional_properties_epcs)[
# [
# "row_id",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# ),
# how="left",
# on="row_id"
# )
# We save the data locally
stonewater_cavity_properties.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties.csv",
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
"postcodes.csv",
index=False
)
additional_properties2.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties.csv",
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
"non-priority postcodes.csv",
index=False
)
# Save the survey findings

View file

@ -44,6 +44,10 @@ epc_data["has_conservation_restrictions"] = (
| (epc_data["is_heritage_building"] == True)
)
whlg_eligible_postcodes["Local Authority"].value_counts()
whlg_eligible_postcodes = whlg_eligible_postcodes[whlg_eligible_postcodes["Local Authority"] == "Waltham Forest"]
# Pathway 1:
# Match based on eligible postcodes
pathway1 = epc_data[epc_data["postcode"].isin(whlg_eligible_postcodes["Postcode"].values)]
@ -67,6 +71,10 @@ pathway1["EPC Date"] = pd.to_datetime(pathway1["EPC Date"]).dt.strftime("%Y-%m-%
# Create a year EPC was lodged
pathway1["EPC Year"] = pd.to_datetime(pathway1["EPC Date"]).dt.year
low_epc = pathway1[pathway1["EPC Rating"].isin(["F", "G"])]
low_epc["EPC Rating"].value_counts()
low_epc.tail(1)[["address", "postcode"]]
pathway1.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Waltham Forest WHLG - Pathway 1 Eligibility.csv",
index=False

View file

@ -0,0 +1,94 @@
import time
import pandas as pd
from tqdm import tqdm
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.logger import setup_logger
logger = setup_logger()
class AssetListEpcData:
def __init__(self, asset_list: pd.DataFrame, epc_auth_token: str):
"""
This class handles pulling data assocaited to an asset list and performs common functions like
getting EPC api data, retrieveing data form the find my epc website and extracting non-intrusive
recommendations
:param asset_list:
"""
# Check the asset list contains the correct columns
self.asset_list = self.check_asset_list(asset_list)
self.epc_auth_token = epc_auth_token
self.extracted_data = None
self.non_invasive_recommendations = None
@staticmethod
def check_asset_list(asset_list):
# TODO: Update this with pydantic
return asset_list
def get_non_invasive_recommendations(self):
"""
Extracts non-invasive recommendations in a format that can be used by the engine
:return:
"""
if self.extracted_data is None:
raise ValueError("Please run get_data first")
self.non_invasive_recommendations = [
{
"uprn": r.get("uprn"),
"address": r["address"],
"postcode": r["postcode"],
"recommendations": r["recommendations"]
} for r in self.extracted_data
]
def get_data(self):
logger.info("Retrieving data for given asset list")
# Pull the additional data
extracted_data = []
for _, home in tqdm(self.asset_list.iterrows(), total=len(self.asset_list)):
add1 = home["address"]
pc = home["postcode"]
# Retrieve the EPC data
epc_searcher = SearchEpc(
address1=add1,
postcode=pc,
uprn=home.get("uprn"),
auth_token=self.epc_auth_token,
os_api_key=""
)
epc_searcher.find_property(skip_os=True)
if epc_searcher.newest_epc is None:
continue
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn
extracted_data.append(
{
"uprn": home.get("uprn"),
"address": home["address"],
"postcode": home["postcode"],
**find_epc_data,
}
)
self.extracted_data = extracted_data
logger.info("Data Extrction complete")

View file

@ -263,7 +263,7 @@ class RetrieveFindMyEpc:
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Change heating to gas condensing boiler": ["boiler_upgrade"],
"Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"],
"Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heater"],
"Flat roof or sloping ceiling insulation": ["flat_roof_insulation"],
"Heating controls (room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
@ -291,7 +291,7 @@ class RetrieveFindMyEpc:
"PV Cells recommendation": [],
"Replacement glazing units": ["double_glazing"],
"Heating controls (time and temperature zone control)": ["time_temperature_zone_control"],
"High heat retention storage heaters": ["high_heat_retention_storage_heaters"],
"High heat retention storage heaters": ["high_heat_retention_storage_heater"],
"Gas condensing boiler": ["boiler_upgrade"],
"Change room heaters to condensing boiler": ["boiler_upgrade"],
"Cylinder thermostat": ["cylinder_thermostat"],
@ -300,6 +300,8 @@ class RetrieveFindMyEpc:
"Fan assisted storage heaters": [],
"Fan-assisted storage heaters": [],
"Step 1:": [],
"Step 2:": [],
'Step 3:': [],
"Biomass stove with boiler": [],
"Replace boiler with biomass boiler": [],
"Heating controls (room thermostat and thermostatic radiator valves)": [
@ -308,7 +310,14 @@ class RetrieveFindMyEpc:
"Heating controls (programmer, and thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": []
"Heating controls (programmer and TRVs)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Heating controls (programmer and room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": [],
"Secondary glazing": ["secondary_glazing"]
}
survey = True

35
etl/funding/app.py Normal file
View file

@ -0,0 +1,35 @@
"""
This scipt prepares the data, required for us to perform funding calculations. The starting data should be stored
on the machine this is being run on, and this will prepare the information and upload if
"""
import pandas as pd
from utils.s3 import save_csv_to_s3
STAGE = "dev"
DATA_BUCKET = "retrofit-data-{stage}"
PROJECTS_SCORES_MATRIX_LOCATION = "/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv"
WHLG_ELIGIBLE_POSTCODES = "/Users/khalimconn-kowlessar/Downloads/WHLG-eligible-postcodes.xlsx"
def app():
# Read in the project scores matrix
project_scores_matrix = pd.read_csv(PROJECTS_SCORES_MATRIX_LOCATION)
# Store in AWS S3
save_csv_to_s3(
dataframe=project_scores_matrix,
bucket_name=DATA_BUCKET.format(stage=STAGE),
file_name="funding/ECO4 Full Project Scores Matrix.csv"
)
# Read in the Warm Homes Local Grant eligible postcodes data
whlg_eligible_postcodes = pd.read_excel(WHLG_ELIGIBLE_POSTCODES, sheet_name="Eligible postcodes", header=1)
# We tidy up the data before we store
whlg_eligible_postcodes = whlg_eligible_postcodes[["Postcode"]]
whlg_eligible_postcodes["Postcode"] = whlg_eligible_postcodes["Postcode"].str.lower()
save_csv_to_s3(
dataframe=whlg_eligible_postcodes,
bucket_name=DATA_BUCKET.format(stage=STAGE),
file_name="funding/whlg eligible postcodes.csv"
)

View file

@ -1,6 +1,5 @@
import os
import time
from idlelib.iomenu import errors
import pandas as pd
import numpy as np
@ -25,12 +24,11 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
epc_data = []
errors = []
no_epc = []
# home = asset_list[asset_list["row_id"] == errors[5]].squeeze()
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
house_number = home[address1_column]
full_address = home[fulladdress_column]
house_number = home[address1_column].strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
@ -58,7 +56,13 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")[1].strip()
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
@ -128,6 +132,10 @@ def extract_address1(asset_list, full_address_col, method="first_two_words"):
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
raise ValueError(f"Method {method} not recognized")
@ -154,17 +162,19 @@ def app():
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Watford"
DATA_FILENAME = "JS Mailing List 10122024.xlsx"
SHEET_NAME = "Export"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = "Property Address"
ADDRESS1_COLUMN = "Address Line 1"
ADDRESS1_METHOD = None
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Southern"
DATA_FILENAME = "January 2025 Additions Query.xlsx"
SHEET_NAME = "Jan 2025 additions"
POSTCODE_COLUMN = "Post Code"
FULLADDRESS_COLUMN = "Street / Block Name"
ADDRESS1_COLUMN = None
ADDRESS1_METHOD = "first_word"
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
MANUAL_UPRN_MAP = {
"Ardelagh Ardelagh Faris Lane Woodham Addlestone KT15 3DJ": 100061484560
}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
@ -213,6 +223,9 @@ def app():
manual_uprn_map=MANUAL_UPRN_MAP
)
no_data = asset_list[asset_list["row_id"].isin(no_epc)]
print(no_data[[FULLADDRESS_COLUMN, POSTCODE_COLUMN]])
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
@ -372,7 +385,7 @@ def app():
how="left",
on="row_id"
)
asset_list = asset_list.drop(columns=["row_id"])
asset_list = asset_list.drop(columns=["row_id", "index"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"

View file

@ -719,8 +719,9 @@ class Costs:
"labour_days": labour_days
}
@classmethod
def solar_pv(
self,
cls,
n_panels: int | float,
has_battery: bool = False,
array_cost=None,
@ -774,7 +775,7 @@ class Costs:
# We add an additional cost for scaffolding
# The costs from installers exclude VAT
vat = subtotal * self.VAT_RATE
vat = subtotal * cls.VAT_RATE
total_cost = subtotal + vat
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people

View file

@ -1,6 +1,5 @@
import re
import backend.app.assumptions as assumptions
from etl.customers.immo.pilot.asset_list import non_invasive_recommendations
from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE
from recommendations.recommendation_utils import (
check_simulation_difference, override_costs, combine_recommendation_configs
@ -632,7 +631,8 @@ class HeatingRecommender:
heating_controls_only,
system_change,
system_type,
measure_type
measure_type,
non_intrusive_recommendation=None
):
"""
Given a recommendation for heating controls, and a recommendation for the heating system, we combine the two
@ -650,8 +650,13 @@ class HeatingRecommender:
:param system_type: The type of heating system we are recommending
:param measure_type: The type of measure we are recommending - more granular than the "type" field, allowing us
to distinguish between different types of heating recommendations
:param non_intrusive_recommendation: A non-intrusive recommendation, which may specify the number of SAP points
or a cost for this recommendation
"""
if non_intrusive_recommendation is None:
non_intrusive_recommendation = {}
# We produce recommendations with & without heating controls
# We will also produce a recommendation for heating controls only
heating_controls_switch = [True, False] if controls_recommendations else [False]
@ -699,13 +704,14 @@ class HeatingRecommender:
"description": recommendation_description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"sap_points": non_intrusive_recommendation.get("sap_points"),
"already_installed": already_installed,
**total_costs,
"simulation_config": recommendation_simulation_config,
"description_simulation": recommendation_description_simulation,
# We insert the heating system type here
"system_type": system_type
"system_type": system_type,
"survey": non_intrusive_recommendation.get("survey", False)
}
output.append(recommendation)
@ -808,6 +814,13 @@ class HeatingRecommender:
# No recommendation needed
return
# We check if there is a high heat retention non-intrusive recommendation
non_intrusive_recommendation = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == "high_heat_retention_storage_heater"),
{}
)
# We check if the property has dual heating in place with a boiler and storage heaters
if self.dual_heating:
new_heating_description = self.DUAL_HEATING_DESCRIPTIONS[
@ -896,7 +909,8 @@ class HeatingRecommender:
heating_controls_only=heating_controls_only,
system_change=system_change,
system_type="high_heat_retention_storage_heater",
measure_type="high_heat_retention_storage_heater"
measure_type="high_heat_retention_storage_heater",
non_intrusive_recommendation=non_intrusive_recommendation
)
if _return:
return recommendations

View file

@ -528,6 +528,9 @@ class Recommendations:
previous_phase_values = {
"sap": float(property_instance.data["current-energy-efficiency"]),
# For carbon, even though we generally use the updated figure which includes the carbon
# associated to appliances, for this scoring process we use the EPC carbon value. This means
# that we don't overestimate the impact since the model uses the EPC carbon value
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
@ -691,6 +694,10 @@ class Recommendations:
"""
This method inserts the kwh savings and the bill savings that the customer will make from the recommendations
based on the predictions from the ML model
It also ensures we base our solar savings and solar carbon savings from the calculations based on
the solar API and size of the array, instead of ML model
:param property_instance: Instance of the Property class, for the home associated to property_id
:param kwh_simulation_predictions: dictionary of predictions from the model apis
:param property_recommendations: dictionary of recommendations for the property
@ -824,6 +831,12 @@ class Recommendations:
if rec["type"] == "solar_pv":
rec["kwh_savings"] = rec_impact["solar_kwh_savings"].values[0]
# Calculate carbon savings from this - emissions in kg and convert to tonnes
emissions_kg = rec["kwh_savings"] * assumptions.ELECTRICITY_CARBON_INTENSITY
emissions_tonnes = emissions_kg / 1000
rec["co2_equivalent_savings"] = emissions_tonnes
rec["energy_cost_savings"] = (
rec_impact["solar_kwh_savings"].values[0] * AnnualBillSavings.ELECTRICITY_PRICE_CAP
)

View file

@ -138,6 +138,10 @@ class RoofRecommendations:
u_value = self.property.roof["thermal_transmittance"]
# If we have a flat roof but we don't have flat roof as a measure, we exit
if self.property.roof["is_flat"] and "flat_roof_insulation" not in measures:
return
# We check if the roof is already insulated and if so, we exit
# Building regulations part L recommend installing at least 270mm of insulation, however generally we

View file

@ -103,13 +103,22 @@ class SolarPvRecommendations:
for rank, recommendation_config in best_configurations.iterrows():
# If we dont have the panneled_roof_area in the recommendation_config we calculate it
if recommendation_config.get("panneled_roof_area", None):
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / total_roof_area * 100)
# We spread the coverage across the individual units
roof_coverage_percent = round(
((recommendation_config["panneled_roof_area"] / total_roof_area) * 100) / n_units
)
else:
raise Exception("IMPLEMENT ME")
n_floors = (
self.property.number_of_storeys["number_of_storeys"] if
self.property.number_of_storeys["number_of_storeys"] is not None else 3
)
total_cost = self.costs.solar_pv(
array_cost=recommendation_config.get("cost", None),
n_panels=recommendation_config["n_panels"],
n_floors=self.property.number_of_storeys["number_of_storeys"],
n_floors=n_floors,
needs_inverter=True,
)["total"] / n_units

View file

@ -111,8 +111,11 @@ county_to_region_map = {
'Windsor and Maidenhead': 'South East England', 'Woking': 'South East England', 'Wokingham': 'South East England',
'Worthing': 'South East England', 'Wycombe': 'South East England',
'Bath and North East Somerset': 'South West England', 'Bournemouth': 'South West England',
'Bristol': 'South West England', 'Cheltenham': 'South West England', 'Christchurch': 'South West England',
'City of Bristol': 'South West England', 'Cornwall': 'South West England', 'Cotswold': 'South West England',
'Bristol': 'South West England',
'Cheltenham': 'South West England', 'Christchurch': 'South West England',
'City of Bristol': 'South West England',
'Bristol, City of': 'South West England',
'Cornwall': 'South West England', 'Cotswold': 'South West England',
'Devon': 'South West England', 'Dorset': 'South West England', 'East Devon': 'South West England',
'East Dorset': 'South West England', 'Exeter': 'South West England', 'Forest of Dean': 'South West England',
'Gloucester': 'South West England', 'Gloucestershire': 'South West England',

View file

@ -23,6 +23,10 @@ def prepare_input_measures(property_recommendations, goal):
# if the recommendation is a solar recommendation with a battery, we exclude it from the optimisation.
recs = [r for r in recs if ~r["has_battery"]]
recs_to_append = [rec for rec in recs if rec["energy_cost_savings"] >= 0]
if not recs_to_append:
continue
input_measures.append(
[
{
@ -31,7 +35,7 @@ def prepare_input_measures(property_recommendations, goal):
"gain": rec[goal_key],
"type": rec["type"]
}
for rec in recs
for rec in recs if rec["energy_cost_savings"] >= 0
]
)