Merge pull request #328 from Hestia-Homes/survey-extraction

Survey extraction
This commit is contained in:
KhalimCK 2024-07-31 12:00:41 +01:00 committed by GitHub
commit e58c165a63
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 3488 additions and 374 deletions

View file

@ -76,12 +76,15 @@ class Property:
already_installed=None,
non_invasive_recommendations=None,
measures=None,
energy_assessment=None,
is_new=True,
**kwargs
):
self.epc_record = epc_record
self.id = id
self.is_new = is_new
self.address = address
self.postcode = postcode
@ -158,13 +161,14 @@ class Property:
self.floor_height = epc_record.prepared_epc.get("floor_height")
self.insulation_wall_area = None
self.floor_area = epc_record.prepared_epc.get("total_floor_area")
self.pitched_roof_area = None
self.roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = epc_record.prepared_epc.get(
"fixed_lighting_outlets_count"
)
self.floor_level = None
self.number_of_windows = None
self.windows_area = None
self.solar_pv_percentage = None
self.current_adjusted_energy = None
@ -178,6 +182,12 @@ class Property:
self.recommendations_scoring_data = []
self.simulation_epcs = {}
# This additional condition data should change how we pass kwargs to this. We should no longer need to pass
# kwargs to this class, but instead, we should pass the energy assessment condition data
self.energy_assessment_condition_data = energy_assessment["condition"]
self.energy_assessment_is_newer = energy_assessment["energy_assessment_is_newer"]
# TODO: We keep this but only temporarily until we add bathrooms, bedrooms, building id to the condition data
self.parse_kwargs(kwargs)
@classmethod
@ -188,6 +198,10 @@ class Property:
:param kwargs:
:return:
"""
# Note - none of this data is contained in an energy asssessment, but we should consider how this is done
# as we collect more data from the energy assessment
n_bathrooms = kwargs.get("n_bathrooms", None)
if n_bathrooms not in [None, ""]:
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
@ -593,18 +607,12 @@ class Property:
def get_components(
self,
cleaned,
photo_supply_lookup,
floor_area_decile_thresholds,
energy_consumption_client
):
"""
Given the cleaning that has been performed, we'll use this to identify the property
components, from roof to walls to windows, heating and hot water
:param cleaned: This is the dictionary of components found in cleaner.cleaned
:param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage
of the roof that is suitable for solar panels
:param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the
solar pv roof area
:param energy_consumption_client: Contains the heating and hot water kwh models - used to predict current
energy annual consumption in kWh
:return:
@ -669,20 +677,21 @@ class Property:
self.set_floor_type()
self.set_floor_level()
self.set_windows_count()
self.set_solar_panel_area(
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
)
self.set_energy_source()
self.find_energy_sources()
self.set_current_energy_bill(energy_consumption_client)
def set_solar_panel_configuration(self, solar_panel_configuration):
def set_solar_panel_configuration(
self, solar_panel_configuration, roof_area
):
"""
This funtion inserts the solar panel configuration into the property object
"""
self.solar_panel_configuration = solar_panel_configuration
# We also set the roof area
self.roof_area = roof_area
def set_current_energy_bill(self, energy_consumption_client):
"""
Given what we know about the property now, estimates the current energy consumption using the UCL paper
@ -697,17 +706,20 @@ class Property:
# Today's costs
todays_heating_cost = energy_consumption_client.convert_cost_to_today(
original_cost=float(self.data["heating-cost-current"]),
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"])
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]).tz_localize(None)
)
todays_hot_water_cost = energy_consumption_client.convert_cost_to_today(
original_cost=float(self.data["hot-water-cost-current"]),
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"])
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]).tz_localize(None)
)
todays_lighting_cost = energy_consumption_client.convert_cost_to_today(
original_cost=float(self.data["lighting-cost-current"]),
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"])
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]).tz_localize(None)
)
# If we have the kwh figures, we don't need to predict them
condition_data = self.energy_assessment_condition_data.copy()
scoring_df = pd.DataFrame([self.epc_record.prepared_epc])
# Change columns from underscores to hyphens
scoring_df.columns = [
@ -717,13 +729,20 @@ class Property:
scoring_df[col] = None
energy_consumption_client.data = None
heating_prediction = energy_consumption_client.score_new_data(
new_data=scoring_df, target="heating_kwh"
)[0]
hot_water_prediction = energy_consumption_client.score_new_data(
new_data=scoring_df, target="hot_water_kwh"
)[0]
heating_prediction = (
float(condition_data["space_heating_kwh"]) if condition_data.get("space_heating_kwh") is not None
else energy_consumption_client.score_new_data(
new_data=scoring_df, target="heating_kwh"
)[0]
)
hot_water_prediction = (
float(condition_data["water_heating_kwh"]) if condition_data.get("water_heating_kwh") is not None
else energy_consumption_client.score_new_data(
new_data=scoring_df, target="hot_water_kwh"
)[0]
)
# We convert the lighting cost into kwh, just using the price cap
lighting_kwh = float(self.data["lighting-cost-current"]) / AnnualBillSavings.ELECTRICITY_PRICE_CAP
@ -861,7 +880,10 @@ class Property:
property_data = {
"creation_status": "READY",
"uprn": int(self.data["uprn"]),
"building_reference_number": int(self.data["building-reference-number"]),
"building_reference_number": (
int(self.data["building-reference-number"]) if
self.data["building-reference-number"] is not None else None
),
"has_pre_condition_report": True,
"has_recommendations": True,
"property_type": self.data["property-type"],
@ -1030,27 +1052,33 @@ class Property:
medians across the EPC data
:return:
"""
# Many of these pieces of information are now contained in the condition data
condition_data = self.energy_assessment_condition_data.copy()
# TODO: These functions should work on an EPCRecord object, so that the format is more standardised.
# They could also be added as attributes to the EPC Record
# We can update the number of floors if we have this information in the condition data
self.number_of_floors = int(self.energy_assessment_condition_data["number_of_floors"]) \
if condition_data.get("number_of_floors") is not None \
else self.number_of_floors
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors,
self.number_of_rooms / self.number_of_floors,
self.perimeter = float(self.energy_assessment_condition_data["perimeter"]) \
if condition_data.get("perimeter") is not None \
else estimate_perimeter(
floor_area=self.floor_area / self.number_of_floors,
num_rooms=self.number_of_rooms / self.number_of_floors
)
self.insulation_wall_area = estimate_external_wall_area(
self.insulation_wall_area = float(self.energy_assessment_condition_data["insulation_wall_area"]) \
if condition_data.get("insulation_wall_area") is not None \
else estimate_external_wall_area(
num_floors=self.number_of_floors,
floor_height=self.floor_height,
perimeter=self.perimeter,
built_form=self.data["built-form"],
)
self.insulation_floor_area = self.floor_area / self.number_of_floors
self.pitched_roof_area = esimtate_pitched_roof_area(
floor_area=self.insulation_floor_area, floor_height=self.floor_height
)
self.insulation_floor_area = float(self.energy_assessment_condition_data["main_dwelling_ground_floor_area"]) \
if condition_data.get("main_dwelling_ground_floor_area") is not None \
else self.floor_area / self.number_of_floors
def set_floor_level(self):
self.floor_level = (
@ -1148,7 +1176,11 @@ class Property:
:return:
"""
self.number_of_windows = estimate_windows(
condition_data = self.energy_assessment_condition_data.copy()
self.number_of_windows = int(condition_data["number_of_windows"]) \
if condition_data.get("number_of_windows") is not None \
else estimate_windows(
property_type=self.data["property-type"],
built_form=self.data["built-form"],
construction_age_band=self.construction_age_band,
@ -1156,47 +1188,9 @@ class Property:
number_habitable_rooms=self.number_of_rooms,
)
def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds):
"""
Sets the approximate area of the solar panels
:return:
"""
if (self.insulation_floor_area is None) and (self.pitched_roof_area is None):
raise ValueError(
"Need to set insulation floor area and pitched roof area before setting solar pv roof area"
)
photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup(
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
tenure=self.data["tenure"],
built_form=self.data["built-form"],
property_type=self.data["property-type"],
construction_age_band=self.construction_age_band,
is_flat=self.roof["is_flat"],
is_pitched=self.roof["is_pitched"],
is_roof_room=self.roof["is_roof_room"],
floor_area=self.floor_area,
)
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
percentage_of_roof = percentage_of_roof / 100
self.solar_pv_percentage = percentage_of_roof
def get_solar_pv_roof_area(self, percentage_of_roof):
"""
Given a percentage of the roof, this method will return the estimated area of the solar panels
:param percentage_of_roof:
:return:
"""
return (
self.insulation_floor_area * percentage_of_roof
if self.roof["is_flat"]
else self.pitched_roof_area * percentage_of_roof
)
self.windows_area = float(condition_data["windows_area"]) \
if condition_data.get("windows_area") is not None \
else None
def set_energy_source(self):
"""
@ -1282,3 +1276,79 @@ class Property:
self.hot_water_energy_source = self.heating_energy_source
else:
raise Exception("Investiage me")
def is_ashp_valid(self, exclusions):
if "air_source_heat_pump" in self.non_invasive_recommendations:
return True
if "air_source_heat_pump" in exclusions:
return False
suitable_property_type = self.data["property-type"] in ["House", "Bungalow"]
has_air_source_heat_pump = self.main_heating["has_air_source_heat_pump"]
return suitable_property_type and not has_air_source_heat_pump
def is_solar_pv_valid(self):
# If the property is a flat but we are looking at building solar potential, we can include this
if (self.building_id is not None) and (self.solar_panel_configuration is not None):
return True
is_valid_property_type = self.data["property-type"] in ["House", "Bungalow", "Maisonette"]
is_valid_roof_type = (
self.roof["is_flat"] or self.roof["is_pitched"] or self.roof["is_roof_room"]
)
# If there is no existing solar PV, the photo-supply field will be None or a missing value
has_no_existing_solar_pv = self.data["photo-supply"] in [
None, 0, self.DATA_ANOMALY_MATCHES
]
return is_valid_property_type and is_valid_roof_type and has_no_existing_solar_pv
def estimate_electrical_consumption(self, assumed_ashp_efficiency, exclusions):
"""
Given a property, this method estimates the electrical consumption of the property, based on the energy
consumption, the assumed efficiency of an ASHP and the exclusions.
What we're trying to do here is size up the future electricicty demand of the property, assuming that the
home is eligible for an ASHP. If the property is not eligible for an ASHP, we don't need to adjust the
consumption.
This figure is used to size up solar panels, so they can cover heat generation, even if the property
today doesn't generate its heat from electricity
:param assumed_ashp_efficiency:
:param exclusions:
:return:
"""
exclusions = [] if exclusions is None else exclusions
if (self.main_fuel["fuel_type"] == "electricity") or (
self.main_fuel["fuel_type"] == "mains gas" and not self.is_ashp_valid(exclusions=exclusions)
):
# if the primary fuel is already electricity, we don't need to adjust the consumpion
return self.current_adjusted_energy
if self.main_fuel["fuel_type"] == "mains gas" and self.is_ashp_valid(exclusions=exclusions):
# if the primary fuel is gas, we need to adjust the consumption to reflect the expected
# efficiency of an ASHP.
# We should adjust the energy consumption to reflect the 200-400% efficiency of an ASHP with
# electrified heating, so that the solar panel can cover heating generation.
heating_consumption = self.energy_consumption_estimates["adjusted"]["heating"]
hot_water_consumption = self.energy_consumption_estimates["adjusted"]["hot_water"]
systems_consumptions = heating_consumption + hot_water_consumption
adjusted_consumption = systems_consumptions / (assumed_ashp_efficiency / 100)
electric_consumption = (
adjusted_consumption +
self.energy_consumption_estimates["adjusted"]["lighting"] +
self.energy_consumption_estimates["adjusted"]["appliances"]
)
return electric_consumption
raise NotImplementedError("Have not implemented estimating electrical consumption for this fuel type")

View file

@ -8,6 +8,7 @@ import time
from backend.app.db.functions.solar_functions import get_solar_data, store_batch_data
from utils.logger import setup_logger
from sklearn.preprocessing import MinMaxScaler
from recommendations.Costs import Costs
logger = setup_logger()
@ -107,7 +108,14 @@ class GoogleSolarApi:
@lru_cache(maxsize=128)
def get(
self, longitude, latitude, energy_consumption, required_quality="MEDIUM", is_building=False, session=None,
self,
longitude,
latitude,
energy_consumption,
property_instance=None,
required_quality="MEDIUM",
is_building=False,
session=None,
uprn=None
):
"""
@ -115,7 +123,9 @@ class GoogleSolarApi:
:param longitude: The longitude of the location.
:param latitude: The latitude of the location.
:param energy_consumption: The energy consumption of the building/unit associated to the longitude and latitude.
:param energy_consumption: The energy consumption of the building/unit associated to the longitude and latitude,
that we wish to size the solar panels up against
:param property_instance: The property instance associated to the longitude and latitude.
:param required_quality: The required quality of the data (default is "MEDIUM").
:param is_building: Whether the energy consumption is for a building or a unit.
:param session: The database session to use for the query (default is None).
@ -158,7 +168,9 @@ class GoogleSolarApi:
self.roof_segment_indexes = [segment['segmentIndex'] for segment in self.roof_segments]
# We now start finding the solar panel configurations
self.optimise_solar_configuration(energy_consumption=energy_consumption, is_building=is_building)
self.optimise_solar_configuration(
energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance
)
def save_to_db(self, session, uprns_to_location, scenario_type):
if self.insights_data is None:
@ -178,7 +190,7 @@ class GoogleSolarApi:
"yearly_dc_energy",
"total_cost",
"panneled_roof_area",
"array_warrage",
"array_wattage",
"initial_ac_kwh_per_year",
"lifetime_ac_kwh",
"roi",
@ -191,7 +203,7 @@ class GoogleSolarApi:
"yearly_dc_energy": "yearly_dc_kwh",
"total_cost": "cost",
"panneled_roof_area": "panelled_roof_area",
"array_warrage": "array_kwhp",
"array_wattage": "array_kwhp",
"initial_ac_kwh_per_year": "yearly_ac_kwh",
}
)
@ -226,12 +238,14 @@ class GoogleSolarApi:
installation_life_span)) /
(1 - efficiency_depreciation_factor))
def optimise_solar_configuration(self, energy_consumption, is_building=False):
def optimise_solar_configuration(self, energy_consumption, is_building=False, property_instance=None):
"""
Optimise the solar panel configuration for the building.
:return:
"""
cost_instance = Costs(property_instance=property_instance) if property_instance is not None else None
# Remove any north facing roof segments
panel_performance = []
for config in self.insights_data["solarPotential"]["solarPanelConfigs"]:
@ -246,7 +260,14 @@ class GoogleSolarApi:
wattage = segment["panelsCount"] * self.insights_data["solarPotential"]["panelCapacityWatts"]
generated_dc_energy = segment["yearlyEnergyDcKwh"]
ratio = generated_dc_energy / wattage
cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
if cost_instance is None:
cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
else:
cost = cost_instance.solar_pv(
wattage=wattage, has_battery=False
)["total"]
roi_summary.append(
{
"segmentIndex": segment["segmentIndex"],
@ -274,7 +295,7 @@ class GoogleSolarApi:
"total_cost": total_cost,
"weighted_ratio": weighted_ratio,
"panneled_roof_area": roi_summary["panneled_roof_area"].sum(),
"array_warrage": roi_summary["n_panels"].sum() * self.panel_wattage
"array_wattage": roi_summary["n_panels"].sum() * self.panel_wattage
}
)
@ -290,7 +311,7 @@ class GoogleSolarApi:
# Remove anything where the total ac energy is less than half of the array wattage
panel_performance = panel_performance[
(panel_performance["initial_ac_kwh_per_year"] / panel_performance["array_warrage"]) >= 0.5
(panel_performance["initial_ac_kwh_per_year"] / panel_performance["array_wattage"]) >= 0.5
]
# 2) Calculate the liftime solar energy production
@ -311,12 +332,19 @@ class GoogleSolarApi:
)
# Now that we know the lifetime cnsumption of ac kwh, we can estimate the roi
# Key things we estimate:
# - generation_value: this is the gbp value of the electricity generated
# - roi: the return on investment, calcualated as generation_value / total_cost
# - surplus: this is the amount of additional energy generated, and therefore how much will be exported
# - surplus_value: the value of the surplus energy - this feeds into generation_value, when relevant
# - expected_payback_years: the number of years it will take to pay back the initial investment
lifetime_energy_consumption = energy_consumption * self.installation_life_span
roi_results = []
for _, panel_config in panel_performance.iterrows():
lifetime_ac_kwh = panel_config["lifetime_ac_kwh"]
surplus = 0
generation_deficit = 0
if lifetime_ac_kwh < lifetime_energy_consumption:
# We estimate the amount of electricity generated, based on the price cap
generation_value = lifetime_ac_kwh * AnnualBillSavings.ELECTRICITY_PRICE_CAP
@ -329,7 +357,6 @@ class GoogleSolarApi:
surplus_value = surplus * AnnualBillSavings.ELECTRICITY_EXPORT_PAYMENT
generation_value = lifetime_energy_consumption * AnnualBillSavings.ELECTRICITY_PRICE_CAP
roi = (generation_value + surplus_value) / panel_config["total_cost"]
generation_deficit = surplus_value
# Calculate expected payback years
if generation_value > 0:

View file

@ -0,0 +1,3 @@
# Assumes that the average efficiency of an air source heat pump is 300%, taking the median of the 200-400% range,
# which is often quoted as a sensible efficiency range for air source heat pumps.
AVERAGE_ASHP_EFFICIENCY = 300

View file

@ -0,0 +1,62 @@
from backend.app.db.models.energy_assessments import EnergyAssessment
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from typing import Optional
from sqlalchemy import desc
def bulk_insert_energy_assessments(session: Session, data_list):
"""
This function inserts or updates multiple energy assessment records into the database.
:param session: The SQLAlchemy session.
:param data_list: A list of dictionaries containing energy assessment data.
"""
try:
for data in data_list:
uprn = data.get('uprn')
inspection_date = data.get('inspection_date')
# Check if a record with the same uprn and inspection_date exists
existing_record = session.query(EnergyAssessment).filter_by(
uprn=uprn,
inspection_date=inspection_date
).first()
if existing_record:
# Update the existing record with new data
for key, value in data.items():
setattr(existing_record, key, value)
session.add(existing_record)
else:
# Insert a new record
new_assessment = EnergyAssessment(**data)
session.add(new_assessment)
# Commit the transaction
session.commit()
print("All records inserted or updated successfully.")
except IntegrityError as e:
# Rollback the session in case of error
session.rollback()
print(f"Error occurred: {e}")
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
"""
Retrieve the latest energy assessment for a given UPRN based on the inspection date.
:param session: The database session
:param uprn: The unique property reference number
:return: The latest EnergyAssessment object or None if not found
"""
try:
# Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order
latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by(
desc(EnergyAssessment.inspection_date)).first()
return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response()
except Exception as e:
print(f"An error occurred: {e}")
return None

View file

@ -1,10 +1,14 @@
from sqlalchemy import func
from backend.app.db.models.recommendations import Plan, PlanRecommendations, Recommendation
from backend.app.db.models.portfolio import Portfolio
from backend.app.db.models.recommendations import Plan, PlanRecommendations, Recommendation, Scenario
def aggregate_portfolio_recommendations(
session, portfolio_id: int, total_valuation_increase: float, labour_days: float, aggregated_data: dict
session,
portfolio_id: int,
scenario_id: int,
total_valuation_increase: float,
labour_days: float,
aggregated_data: dict
):
# Aggregate multiple fields
aggregates = (
@ -17,7 +21,11 @@ def aggregate_portfolio_recommendations(
)
.join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id)
.join(Plan, Plan.id == PlanRecommendations.plan_id)
.filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True)
.filter(
Plan.portfolio_id == portfolio_id,
Plan.scenario_id == scenario_id,
Recommendation.default == True
)
.one()
)
@ -30,16 +38,17 @@ def aggregate_portfolio_recommendations(
**aggregated_data
}
# Get the portfolio and update the fields
portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one()
# Get the scenario and update the fields. This data needs to be stored against the scenario, not the portfolio
portfolio_scenario = session.query(Scenario).filter_by(id=scenario_id).one()
# Update the data
for key, value in aggregates_dict.items():
setattr(portfolio, key, value)
setattr(portfolio_scenario, key, value)
# Insert total valuation increase and labour days
portfolio.property_valuation_increase = total_valuation_increase
portfolio.labour_days = labour_days
portfolio_scenario.property_valuation_increase = total_valuation_increase
portfolio_scenario.labour_days = labour_days
# Merge the updated portfolio back into the session
session.merge(portfolio)
# Merge the updated portfolio plan back into the session
session.merge(portfolio_scenario)
session.flush()

View file

@ -1,8 +1,12 @@
from sqlalchemy import insert, delete
from sqlalchemy.orm import Session
from backend.app.db.models.recommendations import Plan, Recommendation, RecommendationMaterials, PlanRecommendations
from backend.app.db.models.portfolio import PropertyModel, PropertyTargetsModel, PropertyDetailsMeter, \
PropertyDetailsEpcModel
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.recommendations import (
Plan, Recommendation, RecommendationMaterials, PlanRecommendations, Scenario
)
from backend.app.db.models.portfolio import (
PropertyModel, PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel
)
def create_plan(session: Session, plan):
@ -11,12 +15,38 @@ def create_plan(session: Session, plan):
:param session: The database session
:param plan: dictionary of data representing a plan to be created
"""
try:
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
session.commit()
return new_plan.id
except SQLAlchemyError as e:
session.rollback()
raise e
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
return new_plan.id
def create_scenario(session: Session, scenario):
"""
This function will create a record for the scenario in the database if it does not exist.
:param session: The database session
:param scenario: dictionary of data representing a scenario to be created
"""
try:
# Before creating a new scenario, we check if there is a scenario for this portfolio id already
# If there is, it means that any new scnario created will NOT be the default scenario
existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first()
scenario["is_default"] = True if not existing_scenario else False
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush()
session.commit()
return new_scenario
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation(session: Session, recommendation):
@ -25,12 +55,15 @@ def create_recommendation(session: Session, recommendation):
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
return new_recommendation.id
try:
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
session.commit()
return new_recommendation.id
except SQLAlchemyError as e:
session.rollback()
raise e
def create_recommendation_material(session: Session, recommendation_id, material_id, depth):
@ -68,62 +101,68 @@ def create_plan_recommendations(session: Session, plan_id, recommendation_ids):
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(session: Session, recommendations_to_upload, property_id):
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["total"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"],
"energy_savings": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"total_work_hours": rec["labour_hours"],
"energy_cost_savings": rec["energy_cost_savings"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
}
for rec in recommendations_to_upload
]
def upload_recommendations(session: Session, recommendations_to_upload, property_id, new_plan_id):
try:
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["total"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"],
"energy_savings": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"total_work_hours": rec["labour_hours"],
"energy_cost_savings": rec["energy_cost_savings"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
}
for rec in recommendations_to_upload
]
session.bulk_insert_mappings(Recommendation, recommendations_data)
# Insert the recommendations, get back the IDs
stmt = insert(Recommendation).returning(Recommendation.id).values(recommendations_data)
result = session.execute(stmt)
uploaded_recommendation_ids = [row[0] for row in result]
# To get the IDs of the newly inserted recommendations, we need to flush the session
session.flush()
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": int(part["depth"]) if part["depth"] else None,
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["total"],
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
# Map the uploaded_recommendation_ids with the original data for reference
uploaded_recommendation_ids = [rec.id for rec in session.query(Recommendation).filter(
Recommendation.property_id == property_id,
Recommendation.description.in_([rec["description"] for rec in recommendations_to_upload])
)]
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
# Prepare data for bulk insert for RecommendationMaterials
# We can have multiple materials per recommendation. The aggregation of the materials will total the
# recommendation figures
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": int(part["depth"]) if part["depth"] else None,
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["total"],
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
# flush the changes to get the newly created IDs
session.flush()
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# flush the changes to get the newly created IDs
session.flush()
# Commit the transaction
session.commit()
return uploaded_recommendation_ids
return True
except SQLAlchemyError as e:
# Rollback the transaction in case of an error
session.rollback()
print(f"An error occurred: {e}")
return False
def clear_portfolio(session: Session, portfolio_id: int):
@ -148,6 +187,9 @@ def clear_portfolio(session: Session, portfolio_id: int):
# Delete all Plans associated with the portfolio
session.execute(delete(Plan).where(Plan.portfolio_id == portfolio_id))
# Delete all Scenarios associated with the portfolio
session.execute(delete(Scenario).where(Scenario.portfolio_id == portfolio_id))
# Delete all Recommendations associated with the properties
session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids)))

View file

@ -0,0 +1,165 @@
from sqlalchemy import Column, Integer, BigInteger, Text, Float, DateTime, Boolean, Date
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class EnergyAssessment(Base):
__tablename__ = 'energy_assessments'
id = Column(BigInteger, primary_key=True, autoincrement=True)
uprn = Column(BigInteger, nullable=False)
uprn_source = Column(Text, nullable=False)
property_type = Column(Text, nullable=False)
building_reference_number = Column(Text)
current_energy_efficiency = Column(Text, nullable=False)
current_energy_rating = Column(Text, nullable=False)
address1 = Column(Text, nullable=False)
address2 = Column(Text, nullable=False)
address3 = Column(Text)
posttown = Column(Text, nullable=False)
postcode = Column(Text, nullable=False)
address = Column(Text, nullable=False)
county = Column(Text)
constituency = Column(Text)
constituency_label = Column(Text)
low_energy_fixed_light_count = Column(Text, nullable=False)
construction_age_band = Column(Text, nullable=False)
mainheat_energy_eff = Column(Text, nullable=False)
windows_env_eff = Column(Text, nullable=False)
lighting_energy_eff = Column(Text, nullable=False)
environment_impact_potential = Column(Text, nullable=False)
mainheatcont_description = Column(Text, nullable=False)
sheating_energy_eff = Column(Text, nullable=False)
local_authority = Column(Text, nullable=False)
local_authority_label = Column(Text, nullable=False)
fixed_lighting_outlets_count = Column(Text, nullable=False)
energy_tariff = Column(Text, nullable=False)
mechanical_ventilation = Column(Text, nullable=False)
solar_water_heating_flag = Column(Text, nullable=False)
co2_emissions_potential = Column(Text, nullable=False)
number_heated_rooms = Column(Text, nullable=False)
floor_description = Column(Text, nullable=False)
energy_consumption_potential = Column(Text, nullable=False)
built_form = Column(Text, nullable=False)
number_open_fireplaces = Column(Text, nullable=False)
windows_description = Column(Text, nullable=False)
glazed_area = Column(Text, nullable=False)
inspection_date = Column(DateTime(timezone=True), nullable=False)
mains_gas_flag = Column(Text, nullable=False)
co2_emiss_curr_per_floor_area = Column(Text, nullable=False)
heat_loss_corridor = Column(Text, nullable=False)
unheated_corridor_length = Column(Text)
flat_storey_count = Column(Text)
roof_energy_eff = Column(Text, nullable=False)
total_floor_area = Column(Text, nullable=False)
environment_impact_current = Column(Text, nullable=False)
roof_description = Column(Text, nullable=False)
floor_energy_eff = Column(Text, nullable=False)
number_habitable_rooms = Column(Text, nullable=False)
hot_water_env_eff = Column(Text, nullable=False)
mainheatc_energy_eff = Column(Text, nullable=False)
main_fuel = Column(Text, nullable=False)
lighting_env_eff = Column(Text, nullable=False)
windows_energy_eff = Column(Text, nullable=False)
floor_env_eff = Column(Text, nullable=False)
sheating_env_eff = Column(Text, nullable=False)
lighting_description = Column(Text, nullable=False)
roof_env_eff = Column(Text, nullable=False)
walls_energy_eff = Column(Text, nullable=False)
photo_supply = Column(Text, nullable=False)
lighting_cost_potential = Column(Text, nullable=False)
mainheat_env_eff = Column(Text, nullable=False)
multi_glaze_proportion = Column(Text, nullable=False)
main_heating_controls = Column(Text, nullable=False)
flat_top_storey = Column(Text)
secondheat_description = Column(Text, nullable=False)
walls_env_eff = Column(Text, nullable=False)
transaction_type = Column(Text, nullable=False)
extension_count = Column(Text, nullable=False)
mainheatc_env_eff = Column(Text, nullable=False)
lmk_key = Column(Text)
wind_turbine_count = Column(Text, nullable=False)
tenure = Column(Text, nullable=False)
floor_level = Column(Text, nullable=False)
potential_energy_efficiency = Column(Text, nullable=False)
potential_energy_rating = Column(Text, nullable=False)
hot_water_energy_eff = Column(Text, nullable=False)
low_energy_lighting = Column(Text, nullable=False)
walls_description = Column(Text, nullable=False)
hotwater_description = Column(Text, nullable=False)
co2_emissions_current = Column(Text, nullable=False)
heating_cost_current = Column(Text, nullable=False)
heating_cost_potential = Column(Text, nullable=False)
hot_water_cost_current = Column(Text, nullable=False)
hot_water_cost_potential = Column(Text, nullable=False)
lighting_cost_current = Column(Text, nullable=False)
energy_consumption_current = Column(Text, nullable=False)
lodgement_date = Column(Date, nullable=False)
lodgement_datetime = Column(DateTime(timezone=False), nullable=False)
mainheat_description = Column(Text, nullable=False)
floor_height = Column(Float, nullable=False)
glazed_type = Column(Text, nullable=False)
file_location = Column(Text, nullable=False)
surveyor_name = Column(Text, nullable=False)
surveyor_company = Column(Text, nullable=False)
space_heating_kwh = Column(Text, nullable=False)
water_heating_kwh = Column(Text, nullable=False)
number_of_doors = Column(Integer, nullable=False)
number_of_insulated_doors = Column(Integer, nullable=False)
number_of_floors = Column(Integer, nullable=False)
insulation_wall_area = Column(Float, nullable=False)
heat_loss_perimeter = Column(Float, nullable=False)
party_wall_length = Column(Float, nullable=False)
perimeter = Column(Float, nullable=False)
rooms_with_bath_and_or_shower = Column(Integer)
rooms_with_mixer_shower_no_bath = Column(Integer)
room_with_bath_and_mixer_shower = Column(Integer)
percent_draftproofed = Column(Integer)
has_hot_water_cylinder = Column(Boolean)
cylinder_insulation_type = Column(Text)
cylinder_insulation_thickness = Column(Integer)
cylinder_thermostat = Column(Boolean)
main_dwelling_ground_floor_area = Column(Float)
number_of_windows = Column(Integer)
windows_area = Column(Float)
EPC_KEYS = [
'low_energy_fixed_light_count', 'address', 'uprn_source', 'floor_height', 'heating_cost_potential',
'unheated_corridor_length', 'hot_water_cost_potential', 'construction_age_band', 'potential_energy_rating',
'mainheat_energy_eff', 'windows_env_eff', 'lighting_energy_eff', 'environment_impact_potential', 'glazed_type',
'heating_cost_current', 'address3', 'mainheatcont_description', 'sheating_energy_eff', 'property_type',
'local_authority_label', 'fixed_lighting_outlets_count', 'energy_tariff', 'mechanical_ventilation',
'hot_water_cost_current', 'county', 'postcode', 'solar_water_heating_flag', 'constituency',
'co2_emissions_potential', 'number_heated_rooms', 'floor_description', 'energy_consumption_potential',
'local_authority', 'built_form', 'number_open_fireplaces', 'windows_description', 'glazed_area',
'inspection_date', 'mains_gas_flag', 'co2_emiss_curr_per_floor_area', 'address1', 'heat_loss_corridor',
'flat_storey_count', 'constituency_label', 'roof_energy_eff', 'total_floor_area', 'building_reference_number',
'environment_impact_current', 'co2_emissions_current', 'roof_description', 'floor_energy_eff',
'number_habitable_rooms', 'address2', 'hot_water_env_eff', 'posttown', 'mainheatc_energy_eff', 'main_fuel',
'lighting_env_eff', 'windows_energy_eff', 'floor_env_eff', 'sheating_env_eff', 'lighting_description',
'roof_env_eff', 'walls_energy_eff', 'photo_supply', 'lighting_cost_potential', 'mainheat_env_eff',
'multi_glaze_proportion', 'main_heating_controls', 'lodgement_datetime', 'flat_top_storey',
'current_energy_rating', 'secondheat_description', 'walls_env_eff', 'transaction_type', 'uprn',
'current_energy_efficiency', 'energy_consumption_current', 'mainheat_description', 'lighting_cost_current',
'lodgement_date', 'extension_count', 'mainheatc_env_eff', 'lmk_key', 'wind_turbine_count', 'tenure',
'floor_level', 'potential_energy_efficiency', 'hot_water_energy_eff', 'low_energy_lighting',
'walls_description', 'hotwater_description'
]
def to_dict(self):
"""
Convert the SQLAlchemy object to a dictionary.
"""
epc = {key.replace("_", "-"): getattr(self, key) for key in self.EPC_KEYS}
# Get everything else
condition = {
column.name: getattr(self, column.name)
for column in self.__table__.columns if column.name not in self.EPC_KEYS
}
return {"epc": epc, "condition": condition}
@staticmethod
def empty_response():
return {"epc": {}, "condition": {}}

View file

@ -50,8 +50,10 @@ class Plan(Base):
__tablename__ = 'plan'
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String, nullable=True, default="")
portfolio_id = Column(BigInteger, ForeignKey(Portfolio.id), nullable=False)
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
scenario_id = Column(BigInteger, ForeignKey('scenario.id')) # Doesn't have to be linked to a scenario
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
is_default = Column(Boolean, nullable=False)
valuation_increase_lower_bound = Column(Float)
@ -65,3 +67,46 @@ class PlanRecommendations(Base):
id = Column(BigInteger, primary_key=True, autoincrement=True)
plan_id = Column(BigInteger, ForeignKey('plan.id'), nullable=False)
recommendation_id = Column(BigInteger, ForeignKey('recommendation.id'), nullable=False)
class Scenario(Base):
__tablename__ = 'scenario'
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
budget = Column(Float)
portfolio_id = Column(BigInteger, ForeignKey(Portfolio.id), nullable=False)
housing_type = Column(String, nullable=False)
goal = Column(String, nullable=False)
trigger_file_path = Column(String, nullable=False)
already_installed_file_path = Column(String)
patches_file_path = Column(String)
non_invasive_recommendations_file_path = Column(String)
exclusions = Column(String)
multi_plan = Column(Boolean, default=False)
is_default = Column(Boolean, default=False, nullable=False)
# Add in the fields we need, which were previously sitting at the portfolio level
cost = Column(Float)
total_work_hours = Column(Float)
energy_savings = Column(Float)
co2_equivalent_savings = Column(Float)
energy_cost_savings = Column(Float)
epc_breakdown_pre_retrofit = Column(String)
epc_breakdown_post_retrofit = Column(String)
number_of_properties = Column(BigInteger)
n_units_to_retrofit = Column(BigInteger)
co2_per_unit_pre_retrofit = Column(String)
co2_per_unit_post_retrofit = Column(String)
energy_bill_per_unit_pre_retrofit = Column(String)
energy_bill_per_unit_post_retrofit = Column(String)
energy_consumption_per_unit_pre_retrofit = Column(String)
energy_consumption_per_unit_post_retrofit = Column(String)
valuation_improvement_per_unit = Column(String)
cost_per_unit = Column(String)
cost_per_co2_saved = Column(String)
cost_per_sap_point = Column(String)
valuation_return_on_investment = Column(String)
property_valuation_increase = Column(Float)
labour_days = Column(Float)

View file

@ -10,6 +10,7 @@ from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
import backend.app.assumptions as assumptions
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
@ -19,8 +20,9 @@ from backend.app.db.functions.property_functions import (
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
create_plan, create_plan_recommendations, upload_recommendations, create_scenario
)
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest
@ -219,6 +221,68 @@ def extract_portfolio_aggregation_data(
return aggregation_data
def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
"""
This function will set up with epc_records dictionary with the newest EPC, the full SAP EPC and the older EPCs
and will factor in an energy assessment that we have performed for a client.
:param epc_searcher: An instance of the SearchEpc class
:param energy_assessment: The energy assessment we have performed. If we have not performed an energy assessment,
this should be an empty response as defined by the models's
EnergyAssessment.empty_response() method
"""
if not energy_assessment["epc"]:
energy_assessment_is_newer = False
return {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}, energy_assessment_is_newer
epc = energy_assessment["epc"]
energy_assessment_date = epc["inspection-date"].strftime("%Y-%m-%d")
# We insert county into the epc, since right now this isn't something that we pull out from the energy
# assessment
epc["county"] = epc_searcher.newest_epc["county"]
epc["constituency"] = epc_searcher.newest_epc["constituency"]
# We check if the energy assessment is newer than the newest EPC
if pd.to_datetime(energy_assessment_date) > pd.to_datetime(epc_searcher.newest_epc["inspection-date"]):
# In this case, our energy assessment is newer than the EPCs available for this property
energy_assessment_is_newer = True
return {
"original_epc": epc,
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
"old_data": epc_searcher.older_epcs.copy() + [epc_searcher.newest_epc.copy()]
}, energy_assessment_is_newer
# We check if the EPC we have produced is contained in the set of EPCs done for the property
# We do this based on inspection-date and SAP
epc_in_historicals = [
x for x in epc_searcher.older_epcs + [epc_searcher.newest_epc]
if x["inspection-date"] == energy_assessment_date and
x["current-energy-efficiency"] == epc["current-energy-efficiency"]
]
energy_assessment_is_newer = False
if epc_in_historicals:
# Then the EPC we have produced is already in the set of EPCs, and our EPC is older than the newest
return {
"original_epc": epc_searcher.newest_epc.copy(),
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
"old_data": epc_searcher.older_epcs.copy()
}, energy_assessment_is_newer
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
# the historicals, so it can't have been lodged, so we include it in the old data
return {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy() + [epc],
}, energy_assessment_is_newer
router = APIRouter(
prefix="/plan",
tags=["plan"],
@ -233,9 +297,6 @@ async def trigger_plan(body: PlanTriggerRequest):
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
# TODO: We should store the trigger file path in the database with the plan so we can track the file that
# triggered the plan
# TODO: if the measure is already installed, it should actually be the very first phase
try:
@ -265,6 +326,7 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if uprn:
@ -281,27 +343,33 @@ async def trigger_plan(body: PlanTriggerRequest):
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
# We check for an energy assessment we have performed on this property:
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
)
if not is_new:
if not is_new and not body.multi_plan:
continue
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
if is_new:
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
# Otherwise, we use the newest EPC
# energy_assessment_is_newer will tell us if the energy assessment is newer than the newest EPC that
# has been publically lodged
epc_records, energy_assessment["energy_assessment_is_newer"] = create_epc_records(
epc_searcher, energy_assessment
)
epc_records = {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
@ -326,18 +394,39 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties.append(
Property(
id=property_id,
is_new=is_new,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
**Property.extract_kwargs(config)
energy_assessment=energy_assessment,
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)
if not input_properties:
return Response(status_code=204)
# If we have any work to do, we create a new scenario
engine_scenario = create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
@ -348,7 +437,6 @@ async def trigger_plan(body: PlanTriggerRequest):
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY)
dataset_version = "2024-07-08"
@ -365,25 +453,48 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting spatial data")
for p in input_properties:
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds, energy_consumption_client)
p.get_components(cleaned=cleaned, energy_consumption_client=energy_consumption_client)
p.get_spatial_data(uprn_filenames)
# TODO: Handle the case of modelling some units as buildings and some as properties individually
logger.info("Performing solar analysis")
# TODO: Tidy this up
building_ids = [
{
"building_id": p.building_id,
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_rating=p.data["current-energy-rating"],
target_rating=body.goal_value,
current_consumption=p.current_adjusted_energy
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
)
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is not None
]
individual_units = [
{
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
),
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is None
]
if building_ids:
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
@ -447,14 +558,46 @@ async def trigger_plan(body: PlanTriggerRequest):
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
else:
# # Model the solar potential at the property level
# for p in input_properties:
# # TODO: Complete me! - we probably won't do this for individual flats
# solar_performance = solar_api_client.get(
# longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]
# )
print("Implement me")
if individual_units:
# Model the solar potential at the property level
for unit in individual_units:
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
if not property_instance.is_solar_pv_valid():
continue
solar_api_client.get(
longitude=unit["longitude"],
latitude=unit["latitude"],
energy_consumption=unit["energy_consumption"],
is_building=False,
session=session,
uprn=unit["uprn"],
property_instance=property_instance
)
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it exists
solar_api_client.save_to_db(
session=session,
uprns_to_location=[
{
"uprn": property_instance.uprn,
"longitude": property_instance.spatial["longitude"],
"latitude": property_instance.spatial["latitude"]
}
],
scenario_type="unit"
)
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"unit_share_of_energy": 1
},
roof_area=solar_api_client.roof_area
)
logger.info("Getting components and epc recommendations")
recommendations = {}
@ -610,18 +753,18 @@ async def trigger_plan(body: PlanTriggerRequest):
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
property_value_increase_ranges[p.id] = valuations
# Your existing operations
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
create_property_details_epc(session, property_details_epc)
if p.is_new:
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
create_property_details_epc(session, property_details_epc)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
@ -629,7 +772,9 @@ async def trigger_plan(body: PlanTriggerRequest):
new_plan_id = create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True,
"scenario_id": engine_scenario.id,
"is_default": True if p.is_new else False,
"name": body.scenario_name,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
@ -641,10 +786,8 @@ async def trigger_plan(body: PlanTriggerRequest):
),
})
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
create_plan_recommendations(
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
property_valuation_increases.append(
@ -683,6 +826,7 @@ async def trigger_plan(body: PlanTriggerRequest):
aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
scenario_id=engine_scenario.id,
total_valuation_increase=total_valuation_increase,
labour_days=labour_days,
aggregated_data=aggregated_data
@ -817,6 +961,7 @@ async def build_mds(body: MdsRequest):
# already_installed=property_already_installed,
# non_invasive_recommendations=property_non_invasive_recommendations,
measures=measures,
is_new=is_new,
**Property.extract_kwargs(config)
)
)

View file

@ -13,6 +13,10 @@ class PlanTriggerRequest(BaseModel):
patches_file_path: Optional[str] = None
non_invasive_recommendations_file_path: Optional[str] = None
exclusions: Optional[conlist(str, min_items=1)] = None
scenario_name: Optional[str] = ""
# If true, will allow us to create multiple plans for the same portfolio, whereas if this is false, if this property
# exists in the portfolio, it will be ignored
multi_plan: Optional[bool] = False
# Pre-defined list of possibilities for exclusions
_allowed_exclusions = {
@ -31,7 +35,7 @@ class PlanTriggerRequest(BaseModel):
"air_source_heat_pump",
}
_allowed_goals = {"Increase EPC"}
_allowed_goals = {"Increasing EPC"}
_allowed_housing_types = {"Social", "Private"}

View file

@ -100,6 +100,9 @@ class PropertyValuation:
200140647: 481_000,
200140648: 373_000,
200140649: 373_000,
# Vander Elliot Intrusive surveys
12103116: 1_537_000,
12103117: 1_404_000,
}
# We base our valuation uplifts on a number of sources

View file

@ -102,6 +102,7 @@ class EnergyConsumptionModel:
# We also retrieve the newest retail price comparison data which comes from Ofgem:
# https://www.ofgem.gov.uk/energy-data-and-research/data-portal/retail-market-indicators
# We use the detail price comparison by company and tariff type data
print("Reading retail price comparison - make sure this is up-to-date")
self.read_retail_price_comparison()
def read_retail_price_comparison(self):
@ -506,31 +507,36 @@ class EnergyConsumptionModel:
return prediction
@staticmethod
def calculate_percentage_decrease(start_rating, end_rating, consumption_averages):
def calculate_percentage_decrease(start_efficiency, end_efficiency, consumption_averages):
start_consumption = consumption_averages.loc[
consumption_averages["current-energy-rating"] == start_rating, "total_consumption"
consumption_averages["current-energy-efficiency"].astype(str) == str(start_efficiency), "total_consumption"
].values[0]
end_consumption = consumption_averages.loc[
consumption_averages["current-energy-rating"] == end_rating, "total_consumption"
consumption_averages["current-energy-efficiency"].astype(str) == str(end_efficiency), "total_consumption"
].values[0]
percentage_decrease = ((start_consumption - end_consumption) / start_consumption) * 100
# percentage_decrease cannot be nehative
if percentage_decrease < 0:
percentage_decrease = 0
return percentage_decrease
def estimate_new_consumption(self, current_rating, target_rating, current_consumption):
def estimate_new_consumption(self, current_energy_efficiency, target_efficiency, current_consumption):
"""
Given then consumption_averages dataset, which is produced as a result of the data_combining.py script,
for the energy kwh models, this function will estimate the new consumption based on the current consumption,
based on the expected reduction in consumption from the current rating to the target rating.
:param current_rating:
:param target_rating:
:param current_energy_efficiency:
:param target_efficiency:
:param current_consumption:
:param df:
:return:
"""
percentage_decrease = self.calculate_percentage_decrease(
current_rating, target_rating, self.consumption_averages
start_efficiency=current_energy_efficiency,
end_efficiency=target_efficiency,
consumption_averages=self.consumption_averages
)
new_consumption = current_consumption * (1 - percentage_decrease / 100)
return new_consumption

View file

@ -133,7 +133,7 @@ def app():
energy_consumption_data = []
for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)):
# Skip the first 50
if i < 250:
if i < 57:
continue
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
@ -146,12 +146,12 @@ def app():
# Take just the newest EPC per uprn, based on lodgement-date
data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn")
data = data.sample(sample_size)
data = data.sample(sample_size, replace=False)
# We use the addreess data to find the related information
collected_data = []
for _, property_data in data.iterrows():
time.sleep(np.random.uniform(0.3, 2))
time.sleep(np.random.uniform(0.2, 1.5))
uprn = int(property_data["uprn"])
address = property_data["address1"]

View file

@ -94,7 +94,7 @@ def app():
# We also estimate the energy consumption reduction from this data, by band
df["total_consumption"] = df["heating_kwh"] + df["hot_water_kwh"]
consumption_averages = df.groupby("current-energy-rating")["total_consumption"].meam().reset_index()
consumption_averages = df.groupby("current-energy-efficiency")["total_consumption"].mean().reset_index()
# Save the consumption averages back to s3
save_dataframe_to_s3_parquet(

View file

@ -11,7 +11,10 @@ from utils.s3 import read_dataframe_from_s3_parquet
# The mode EPC rating is D, so we associate the £238k valuation with an EPC D property
# Therefore value_of_F * 1.15 = value_of_D * 1.03
# Therefore value_of_F = value_of_D * 1.03/1.15 = 238k * (1.03/1.15) = 213165
PROPERTY_VALUE_ESTIMATE = 213_165
PROPERTY_VALUE_ESTIMATE = 200_000
# UPRNs of properties we need
MANUAL_EXCLUSIONS = []
def aggregate_matches(matching_lookup, company_ownership, properties):
@ -73,7 +76,7 @@ def find_f_g_properties(paths):
epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str)
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed')
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed', errors="coerce")
epc_data = epc_data.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
@ -84,7 +87,7 @@ def find_f_g_properties(paths):
data = pd.concat(data)
# Save as an excel
data.to_excel("EPC F & G Properties.xlsx", index=False)
data.to_excel("EPC F & G Properties - V2.xlsx", index=False)
def remove_text_in_brackets(address: str) -> str:
@ -196,7 +199,7 @@ def remove_duplicate_matches(matching_lookup, properties, company_ownership):
matches_to_drop[["UPRN", "Title Number"]].copy()
)
to_drop = pd.concat(to_drop)
to_drop = pd.concat(to_drop) if to_drop else pd.DataFrame()
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
@ -245,6 +248,74 @@ def remove_duplicate_uprn_matches(matching_lookup, properties, company_ownership
return matching_lookup
def filter_land_registry(properties):
column_names = [
"transaction_id",
"price",
"date_of_transfer",
"postcode",
"property_type",
"old_new",
"duration",
"paon",
"saon",
"street",
"locality",
"town_city",
"district",
"county",
"ppd_category_type",
"record_status",
]
land_registry = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/pp-complete.csv", header=None)
land_registry.columns = column_names
land_registry = land_registry[
land_registry["postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique())
]
land_registry["date_of_transfer"] = pd.to_datetime(
land_registry["date_of_transfer"], format="%Y-%m-%d", errors="coerce"
)
# Take data from the last 5 years
land_registry = land_registry[
(land_registry["date_of_transfer"] >= "2019-01-01")
]
# Filter this
land_registry.to_csv(
"/Users/khalimconn-kowlessar/Downloads/land_registry_prices_paid_filtered.csv", index=False
)
def is_substring(x, match_string):
if pd.isnull(x):
return False
return x in match_string.lower()
def house_number_match(paon, house_number):
# Firstly try and convert to numberic
try:
paon_numeric = int(paon)
house_number_numeric = int(house_number)
return paon_numeric == house_number_numeric
except Exception as e: # noqa
# If we can't convert both to numeric, we do an equality
return paon == house_number
def check_equalities(lr_filtered):
all_paon_equal = all(lr_filtered["paon"] == lr_filtered["paon"].values[0])
if pd.isnull(lr_filtered["saon"].values[0]):
all_saon_equal = all(pd.isnull(lr_filtered["saon"]))
else:
all_saon_equal = all(lr_filtered["saon"] == lr_filtered["saon"].values[0])
all_street_equal = all(lr_filtered["street"] == lr_filtered["street"].values[0])
return all_paon_equal, all_saon_equal, all_street_equal
def app():
"""
This script is for scoping property ownership for EPC F & G rated properties in Birmingam, for Goldman Sachs
@ -254,8 +325,8 @@ def app():
# https://epc.opendatacommunities.org/domestic/search?address=&postcode=&local-authority=&constituency
# =&uprn=100031179243&from-month=1&from-year=2008&to-month=12&to-year=2024
# is actually listed in two local authorities causing us to think it's an EPC F & G property, but it's
# it's actually EPC E. Need to handle this, probably by reading in all of the EPC data, concatenating together
# and performing a singular filter for most recent EPC by UPRN
# it's actually EPC E. Need to handle this, probably by reading in all of the EPC data, concatenating
# together and performing a singular filter for most recent EPC by UPRN
# paths = [
# "local_data/all-domestic-certificates/domestic-E08000025-Birmingham/certificates.csv",
# "local_data/all-domestic-certificates/domestic-E08000031-Wolverhampton/certificates.csv",
@ -293,17 +364,19 @@ def app():
# paths = list(set(paths))
# find_f_g_properties(paths)
properties = pd.read_excel("EPC F & G Properties.xlsx")
company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/CCOD_FULL_2024_04.csv")
properties = pd.read_excel("EPC F & G Properties - V2.xlsx")
# filter_land_registry(properties)
company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/CCOD_FULL_2024_07.csv")
company_ownership["is_overseas"] = False
overseas_company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_04 2.csv")
overseas_company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_07.csv")
overseas_company_ownership["is_overseas"] = True
company_ownership = pd.concat([company_ownership, overseas_company_ownership])
# FIlter on relevant postcodes
company_ownership = company_ownership[
company_ownership["Postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique())]
company_ownership["Postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique())
]
# Now we filter properties the other way around
properties = properties[properties["POSTCODE"].str.lower().isin(company_ownership["Postcode"].str.lower().unique())]
@ -414,13 +487,11 @@ def app():
freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup)
leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup)
shared_leasehold_match = pd.concat(shared_leasehold_match)
shared_freehold_match = pd.concat(shared_freehold_match)
# freehold_matching_lookup.to_excel("freehold_matching_lookup_new.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup_new.xlsx")
# shared_leasehold_match.to_excel("shared_leasehold_match_new.xlsx")
# shared_freehold_match.to_excel("shared_freehold_match_new.xlsx")
# freehold_matching_lookup.to_excel("freehold_matching_lookup V2.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup V2.xlsx")
# freehold_matching_lookup = pd.read_excel("freehold_matching_lookup V2.xlsx")
# leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup V2.xlsx")
# The approximate matches aren't very good
freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"]
@ -429,23 +500,309 @@ def app():
# Combine
combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
# Remove duplicates
combined_matching_lookup = remove_duplicate_matches(combined_matching_lookup, properties, company_ownership)
combined_matching_lookup = remove_duplicate_matches(
matching_lookup=combined_matching_lookup, properties=properties, company_ownership=company_ownership
)
# We also have duplicates at a UPRN level
combined_matching_lookup = remove_duplicate_uprn_matches(combined_matching_lookup, properties, company_ownership)
# There are some cases where we have duplicates
# freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership)
# leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership)
matched_addresses = combined_matching_lookup.merge(
properties[["UPRN", "ADDRESS", "CURRENT_ENERGY_EFFICIENCY", "CURRENT_ENERGY_RATING"]].rename(
columns={"ADDRESS": "epc_address"}),
properties[
[
"UPRN",
"ADDRESS",
"ADDRESS1",
"CURRENT_ENERGY_EFFICIENCY",
"CURRENT_ENERGY_RATING",
"POSTCODE",
"LODGEMENT_DATE",
"TRANSACTION_TYPE"
]
].rename(
columns={
"ADDRESS": "epc_address",
"ADDRESS1": "epc_address1",
"POSTCODE": "epc_postcode"
}
),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address", "Company Registration No. (1)", "Proprietor Name (1)"]],
company_ownership[
[
"Title Number",
"Property Address",
"Postcode",
"Company Registration No. (1)",
"Proprietor Name (1)",
"Date Proprietor Added",
]
],
how="left", on="Title Number"
)
# Let's try and get the house number
matched_addresses["house_number"] = (
matched_addresses["epc_address"]
.apply(remove_text_in_brackets)
.apply(SearchEpc.get_house_number)
.str.lower()
.str.replace(",", "")
)
# Read in land registry
land_registry = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/land_registry_prices_paid_filtered.csv",
)
# We now perform a match between the land registry data and the matched address, in an attempt to find
# out when these properties last sold. The land registry data has been pre filtered on the postcodes in this
# data, and for sales within the last 5 years, to ensure the file isn't too large.
land_registry["postcode"] = land_registry["postcode"].str.lower().str.strip()
land_registry["street"] = land_registry["street"].str.lower().str.strip()
land_registry["paon"] = land_registry["paon"].str.lower().str.strip()
land_registry["saon"] = land_registry["saon"].str.lower().str.strip()
land_registry["date_of_transfer"] = pd.to_datetime(land_registry["date_of_transfer"])
land_registry_matches = []
for _, match in tqdm(matched_addresses.iterrows(), total=len(matched_addresses)):
# Filter land registry on the postcode
lr_filtered = land_registry[
(land_registry["postcode"] == match["epc_postcode"].lower().strip())
]
# Filter further, when the street is in in the address
# street should be contained in epc_address
lr_filtered = lr_filtered[
lr_filtered["street"].apply(lambda x: is_substring(x, match["epc_address"].lower())) |
lr_filtered["street"].apply(lambda x: is_substring(x, match["Property Address"].lower()))
]
if lr_filtered.empty:
continue
# We now check if paon is in address 1
lr_filtered["paon_match"] = lr_filtered["paon"].apply(lambda x: house_number_match(x, match["house_number"]))
# We also try the secondary match
lr_filtered["saon_match"] = (
lr_filtered["saon"].apply(
lambda x: False if pd.isnull(x) else is_substring(x, match["epc_address1"])
)
)
# We fileter where we have a primary or secondary match
lr_filtered = lr_filtered[
lr_filtered["paon_match"] | lr_filtered["saon_match"]
]
if lr_filtered.empty:
continue
elif lr_filtered.shape[0] == 1:
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
elif lr_filtered.shape[0] > 1:
# We make sure all records are the same and take the newest
all_paon_equal, all_saon_equal, all_street_equal = check_equalities(lr_filtered)
has_paon_match = any(lr_filtered["paon_match"])
if all_paon_equal and all_street_equal and all_saon_equal:
# Take the newest record, append and continue
lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False)
lr_filtered = lr_filtered.head(1)
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
elif has_paon_match and all_street_equal:
# Peform filter on paon
lr_filtered = lr_filtered[lr_filtered["paon_match"]]
# Do an addtiioanl equality check
all_paon_equal, all_saon_equal, all_street_equal = check_equalities(lr_filtered)
if all_paon_equal and all_street_equal and all_saon_equal:
lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False)
lr_filtered = lr_filtered.head(1)
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
else:
# We do a match on saon
lr_filtered["saon_match2"] = lr_filtered["saon"].apply(
lambda x: False if pd.isnull(x) else is_substring(x, match["epc_address"])
)
lr_filtered = lr_filtered[lr_filtered["saon_match2"]]
if lr_filtered.empty:
continue
elif lr_filtered.shape[0] == 1:
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
else:
raise NotImplementedError("wtf")
else:
# We have a final check, based on an observed case
lr_address_1 = " ".join([x.lower().strip() for x in match["Property Address"].split(",")[0:2]])
lr_filtered["paon_match2"] = lr_filtered["paon"].apply(
lambda x: False if pd.isnull(x) else is_substring(x, lr_address_1)
)
lr_filtered = lr_filtered[lr_filtered["paon_match2"]]
if lr_filtered.empty:
continue
elif lr_filtered.shape[0] == 1:
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
else:
# Check all the same
all_paon_equal, all_saon_equal, all_street_equal = check_equalities(lr_filtered)
# Check saon is house number with exact match
lr_filtered["saon_match2"] = lr_filtered["saon"].apply(
lambda x: False if pd.isnull(x) else house_number_match(x, match["house_number"])
)
# We check if we have a flat
match_flat_number = re.match("flat (\d+)", match["epc_address1"].lower())
match_apartment_number = re.match("apartment (\d+)", match["epc_address1"].lower())
lr_filtered["saon_match3"] = False
if match_flat_number is not None:
# Get out the match
match_flat_number = "flat " + match_flat_number.group(1)
lr_filtered["saon_match3"] = lr_filtered["saon"].apply(
lambda x: False if pd.isnull(x) else x == match_flat_number
)
if match_apartment_number is not None:
# Get out the match
match_apartment_number = "apartment " + match_apartment_number.group(1)
lr_filtered["saon_match3"] = lr_filtered["saon"].apply(
lambda x: False if pd.isnull(x) else x == match_apartment_number
)
if all_paon_equal and all_saon_equal and all_street_equal:
# Take the newest record
lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False)
lr_filtered = lr_filtered.head(1)
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
elif any(lr_filtered["saon_match2"]):
lr_filtered = lr_filtered[lr_filtered["saon_match2"]]
all_saon_equal, all_paon_equal, all_street_equal = check_equalities(lr_filtered)
if all_paon_equal and all_saon_equal and all_street_equal:
# Filter on the newest record
lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False)
lr_filtered = lr_filtered.head(1)
if lr_filtered.shape[0] == 1:
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
elif any(lr_filtered["saon_match3"]):
lr_filtered = lr_filtered[lr_filtered["saon_match3"]]
if lr_filtered.shape[0] == 1:
land_registry_matches.append(
{
"uprn": match["UPRN"],
"transaction_id": lr_filtered['transaction_id'].values[0],
"price": lr_filtered["price"].values[0],
"date_of_transfer": lr_filtered["date_of_transfer"].values[0],
}
)
continue
raise NotImplementedError("wtf")
else:
raise NotImplementedError("What happened here?")
land_registry_matches = pd.DataFrame(land_registry_matches)
# land_registry_matches.to_excel("land_registry_matches.xlsx")
# Check the matches against the addresses
# lr_to_addresses = matched_addresses[
# ["UPRN", "epc_address", "epc_postcode", "Property Address", "Postcode"]
# ].merge(
# land_registry_matches,
# how="inner",
# left_on="UPRN",
# right_on="uprn"
# ).drop(columns=["uprn"]).merge(
# land_registry[["transaction_id", "paon", "saon", "street", "postcode"]],
# how="left", on="transaction_id"
# )
# Merge onto matched addresses
matched_addresses = matched_addresses.merge(
land_registry_matches,
how="left",
left_on="UPRN",
right_on="uprn"
).drop(columns=["uprn"])
# Flat anything that sold in the last year
matched_addresses["sold_recently"] = (
matched_addresses["date_of_transfer"] >= pd.Timestamp.now() - pd.DateOffset(years=1)
)
matched_addresses["sale_lodged_recently"] = (
(pd.to_datetime(matched_addresses["LODGEMENT_DATE"]) >= pd.Timestamp.now() - pd.DateOffset(months=12)) &
(matched_addresses["TRANSACTION_TYPE"].isin(["marketed sale", "non marketed sale"]))
)
# Drop rows on the booleans
matched_addresses = matched_addresses[
~matched_addresses["sold_recently"] &
~matched_addresses["sale_lodged_recently"]
]
# Filter combined_matching_lookup accordingly
combined_matching_lookup = combined_matching_lookup[
combined_matching_lookup["UPRN"].isin(matched_addresses["UPRN"])
]
# shared_freehold_match = pd.DataFrame(shared_freehold_match)
# Strore these files
# freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx")
@ -457,33 +814,28 @@ def app():
# leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup.xlsx")
# shared_leasehold_match = pd.read_excel("shared_leasehold_match.xlsx")
freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties)
leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties)
# freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties)
# leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties)
combined_aggregate = aggregate_matches(
combined_matching_lookup, company_ownership, properties
matching_lookup=combined_matching_lookup,
company_ownership=company_ownership,
properties=properties
)
investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000]
investment_50m = combined_aggregate[combined_aggregate["cumulative_value"] <= 51_000_000]
investment_20m_properties = matched_addresses[
matched_addresses["Company Registration No. (1)"].isin(investment_20m["Company Registration No. (1)"])
]
investment_50m_properties = matched_addresses[
matched_addresses["Company Registration No. (1)"].isin(investment_50m["Company Registration No. (1)"])
]
portfolio_epc_data_50m = properties[properties["UPRN"].isin(investment_50m_properties["UPRN"])]
portfolio_epc_data_20m = properties[properties["UPRN"].isin(investment_20m_properties["UPRN"])]
investment_20m_properties.to_excel("investment_20m_properties 28th May.xlsx", index=False)
investment_50m_properties.to_excel("investment_50m_properties 28th May.xlsx", index=False)
# Storing data
# investment_50m_properties.to_excel("investment_50m_properties 28th July.xlsx", index=False)
# Store the EPC data
portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 28th May.xlsx", index=False)
portfolio_epc_data_20m.to_excel("portfolio_epc_data_20m 28th May.xlsx", index=False)
# portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 29th July.xlsx", index=False)
# We check if any of these properties are in a conservation area
valuations = pd.read_excel("property value.xlsx")
@ -529,6 +881,48 @@ def company_aggregation():
aggregation.to_excel("Company ownership aggregation.xlsx")
def extract_price_info(text):
# Use regex to find the relevant price information
match = re.search(r'Estimated price\n\nLow£([\d,]+)k\n\n£([\d,]+)k\n\nHigh£([\d,]+)k', text)
if match:
low_price = int(match.group(1).replace(',', '')) * 1000
est_price = int(match.group(2).replace(',', '')) * 1000
high_price = int(match.group(3).replace(',', '')) * 1000
price_info = {
'Zoopla Valuation': est_price,
'Zoopla Lower Bound': low_price,
'Zoopla Upper Bound': high_price
}
return price_info
return None
def get_valuations(portfolio_epc_data_50m):
# This gets blocked pretty quickly by Zoopla
import requests
import time
from tqdm import tqdm
valuation_data = []
for _, property_data in tqdm(portfolio_epc_data_50m.iterrows(), total=len(portfolio_epc_data_50m)):
uprn = property_data["UPRN"]
response = requests.get(
f"https://r.jina.ai/https://www.zoopla.co.uk/property/uprn/{uprn}/"
)
pricing = extract_price_info(response.text)
valuation_data.append(
{
"UPRN": uprn,
**pricing
}
)
time.sleep(2)
def prepare_anonymised_data():
investment_50m_properties = pd.read_excel("investment_50m_properties 28th May.xlsx", header=0)
investment_epc_data = pd.read_excel("portfolio_epc_data_50m 28th May.xlsx", header=0)

View file

@ -0,0 +1,721 @@
import re
import numpy as np
import usaddress
from datetime import datetime
from xml.dom.minidom import parseString
from backend.app.utils import sap_to_epc
from etl.xml_survey_extraction.pcdb import heating_data
PROPERTY_TYPE_LOOKUP = {
"0": "House",
"House": "House",
}
def get_house_number(address: str) -> str | None:
"""
This method will use the usaddress library to parse an address and extract the house number
:return:
"""
parsed = usaddress.parse(address)
parsed_house_number = [x for x in parsed if (x[1] == "AddressNumber")]
parsed_house_number = parsed_house_number[0][0] if parsed_house_number else None
if parsed_house_number is None:
# Because usaddress isn't optimal for parsing addresses with some prefixes such as 'Flat',
# we also add a custom approach
# Pattern to look for 'Flat' or 'Apartment' followed by a number, or just a number at the beginning
pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)'
match = re.search(pattern, address)
if match:
# Return the first non-None group found
return next(g for g in match.groups() if g is not None)
else:
return None
# Remove training commas
parsed_house_number = parsed_house_number.replace(",", "")
return parsed_house_number
class XmlParser:
epc = {}
additional_data = {}
uprn = None
# heating/emissions information
space_heating_kwh = None
water_heating_kwh = None
heating_system = None
heating_controls = None
# Assessor details
surveyor_name = None
number_of_doors = None
number_of_insulated_doors = None
windows = None
# Property dimensions
number_of_floors = None
perimeter = None
heat_loss_perimeter = None
party_wall_length = None
total_floor_area = None
floor_height = None
insulation_wall_area = None
floor_dimensions = None
# The age band lookup is based on the country code
AGE_BAND_LOOKUP = {
# England & Wales
"EAW": {
"A": "England and Wales: before 1900",
"B": "England and Wales: 1900-1929",
"C": "England and Wales: 1930-1949",
"D": "England and Wales: 1950-1966",
"E": "England and Wales: 1967-1975",
"F": "England and Wales: 1976-1982",
"G": "England and Wales: 1983-1990",
"H": "England and Wales: 1991-1995",
"I": "England and Wales: 1996-2002",
"J": "England and Wales: 2003-2006",
"K": "England and Wales: 2007-2011",
"L": "England and Wales: 2012 onwards",
}
}
RATINGS_MAP = {
"0": "N/A",
"1": "Very Poor",
"2": "Poor",
"3": "Average",
"4": "Good",
"5": "Very Good"
}
MECHANICAL_VENTILATION_MAP = {
"0": "natural"
}
BUILT_FORM_MAP = {
"1": "Detached",
}
GLAZED_AREA_MAP = {
"4": "Much More Than Typical"
}
FUEL_TYPE_MAP = {
"26": "mains gas (not community)"
}
TRANSACTION_TYPE_MAP = {
"13": "ECO assessment"
}
TENURE_MAP = {
'1': "Owner-occupied"
}
TARIFF_MAP = {
"1": "Dual",
"2": "Single"
}
def __init__(self, file, filekey, surveyor_company, uprn=None):
file.seek(0) # Ensure the file pointer is at the beginning
xml_string = file.read().decode('utf-8')
self.xml = parseString(xml_string)
self.filekey = filekey
self.surveyor_company = surveyor_company
# We check if we have a lig xml or rdsap xml
# We look for the presence of the Schema-Version-Original tag
self.is_lig = len(self.xml.getElementsByTagName("Schema-Version-Original")) > 0
self.get_uprn(uprn)
@staticmethod
def get_node(node):
"""
Utility function to get the node value from the xml, where data might be optional
:return:
"""
node_first_child = node.firstChild
if node_first_child is None:
return None
return node_first_child.nodeValue
def run(self):
if not self.is_lig:
return
self.get_assessor_details()
self.get_heating_and_emissions_data()
# self.get_detailed_heating_specs()
# Building fabric
self.get_doors()
self.get_floor_dimensions()
self.get_windows()
# Get all of the EPC data
self.extract_epc()
# Put together all of the additional data we capture
self.extract_additional_data()
def extract_epc(self):
if self.floor_dimensions is None:
raise ValueError("Run get_floor_dimensions() first")
if self.windows is None:
raise ValueError("Run get_windows() first")
property_type = self.get_property_type()
if property_type == "Flat":
raise NotImplementedError(
"Need to handle: heat-loss-corridor, unheated-corridor-length, flat-storey-count, flat-top-storey, "
"floor-level"
)
heat_loss_corridor = "NO DATA!"
unheated_corridor_length = ""
flat_storey_count = ""
flat_top_storey = ""
floor_level = "NO DATA!"
floor_height = np.mean([
float(x['room_height']) for x in self.floor_dimensions if
x['building_part_identifier'] == 'Main Dwelling' and not x['room_roof']
])
# Take the most prevelant glazing type
glazed_type = [w["glazing_type"] for w in self.windows if w['window_location'] == '0']
glazed_type = max(glazed_type, key=glazed_type.count)
energy_tariff = (
self.xml.getElementsByTagName("SAP-Energy-Source")[0]
.getElementsByTagName("Meter-Type")[0]
.firstChild.nodeValue
)
energy_tariff = self.TARIFF_MAP[energy_tariff]
self.epc = {
"uprn": self.uprn,
"uprn-source": "Address Matched",
"property-type": property_type,
"building-reference-number": "",
**self.get_sap(),
**self.get_property_address(),
"low-energy-fixed-light-count": self.get_node_value('Low-Energy-Fixed-Lighting-Outlets-Count'),
"construction-age-band": self.AGE_BAND_LOOKUP[
self.get_node_value('Country-Code')
][self.get_node_value('Construction-Age-Band')],
"mainheat-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Main-Heating', 'Energy-Efficiency-Rating')
],
"windows-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Window', 'Environmental-Efficiency-Rating')
],
"lighting-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Lighting', 'Energy-Efficiency-Rating')
],
"environment-impact-potential": self.get_energy_assessment_value('Environmental-Impact-Potential'),
"mainheatcont-description":
self.get_property_summary_value('Main-Heating-Controls', 'Description'),
"sheating-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Secondary-Heating', 'Energy-Efficiency-Rating')
],
"local-authority": "", # Not included in the xml
"local-authority-label": "",
"fixed-lighting-outlets-count": self.get_node_value('Fixed-Lighting-Outlets-Count'),
"energy-tariff": energy_tariff,
"mechanical-ventilation": self.MECHANICAL_VENTILATION_MAP[self.get_node_value('Mechanical-Ventilation')],
"solar-water-heating-flag": self.get_node_value('Solar-Water-Heating'),
"co2-emissions-potential": self.get_energy_assessment_value('CO2-Emissions-Potential'),
"number-heated-rooms": self.get_node_value('Heated-Room-Count'),
"floor-description": self.get_property_summary_value('Floor', 'Description'),
"energy-consumption-potential": self.get_energy_assessment_value('Energy-Consumption-Potential'),
"built-form": self.BUILT_FORM_MAP[self.get_node_value('Built-Form')],
"number-open-fireplaces": self.get_node_value('Open-Fireplaces-Count'),
"windows-description": self.get_property_summary_value('Window', 'Description'),
"glazed-area": self.GLAZED_AREA_MAP[self.get_node_value('Glazed-Area')],
"inspection-date": self.get_node_value('Inspection-Date'),
"mains-gas-flag": self.get_node_value('Mains-Gas'),
"co2-emiss-curr-per-floor-area": self.get_energy_assessment_value('CO2-Emissions-Current-Per-Floor-Area'),
"heat-loss-corridor": heat_loss_corridor,
"unheated-corridor-length": unheated_corridor_length,
"flat-storey-count": flat_storey_count,
"roof-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Roof', 'Energy-Efficiency-Rating')
],
"total-floor-area": self.get_node_value('Total-Floor-Area'),
"environment-impact-current": self.get_energy_assessment_value('Environmental-Impact-Current'),
"roof-description": self.get_property_summary_value('Roof', 'Description'),
"floor-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Floor', 'Energy-Efficiency-Rating')
],
"number-habitable-rooms": self.get_node_value('Habitable-Room-Count'),
"hot-water-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Hot-Water', 'Environmental-Efficiency-Rating')
],
"mainheatc-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Main-Heating-Controls', 'Energy-Efficiency-Rating')
],
"main-fuel": self.FUEL_TYPE_MAP[self.get_node_value('Main-Fuel-Type')],
"lighting-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Lighting', 'Environmental-Efficiency-Rating')
],
"windows-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Window', 'Energy-Efficiency-Rating')
],
"floor-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Floor', 'Environmental-Efficiency-Rating')
],
"sheating-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Secondary-Heating', 'Environmental-Efficiency-Rating')
],
"lighting-description": self.get_property_summary_value('Lighting', 'Description'),
"roof-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Roof', 'Environmental-Efficiency-Rating')
],
"walls-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Wall', 'Energy-Efficiency-Rating')
],
"photo-supply": self.get_photo_supply(),
"lighting-cost-potential": self.get_energy_assessment_value('Lighting-Cost-Potential'),
"mainheat-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Main-Heating', 'Environmental-Efficiency-Rating')
],
"multi-glaze-proportion": self.get_node_value('Multiple-Glazed-Proportion'),
"main-heating-controls": self.get_property_summary_value('Main-Heating-Controls', 'Description'),
"flat-top-storey": flat_top_storey,
"secondheat-description": self.get_property_summary_value('Secondary-Heating', 'Description'),
"walls-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Wall', 'Environmental-Efficiency-Rating')
],
"transaction-type": self.TRANSACTION_TYPE_MAP[self.get_node_value('Transaction-Type')],
"extension-count": self.get_node_value('Extensions-Count'),
"mainheatc-env-eff": self.RATINGS_MAP[
self.get_property_summary_value('Main-Heating-Controls', 'Environmental-Efficiency-Rating')
],
"lmk-key": "", # Doesn't exist for non-EPC xmls
"wind-turbine-count": self.get_node_value('Wind-Turbines-Count'),
"tenure": self.TENURE_MAP[self.get_node_value('Tenure')],
"floor-level": floor_level,
"potential-energy-efficiency": self.get_energy_assessment_value('Energy-Rating-Potential'),
"potential-energy-rating": sap_to_epc(float(self.get_energy_assessment_value('Energy-Rating-Potential'))),
"hot-water-energy-eff": self.RATINGS_MAP[
self.get_property_summary_value('Hot-Water', 'Energy-Efficiency-Rating')
],
"low-energy-lighting": self.get_node_value('Low-Energy-Lighting'),
"walls-description": self.get_property_summary_value('Wall', 'Description'),
"hotwater-description": self.get_property_summary_value('Hot-Water', 'Description'),
"co2-emissions-current": self.get_node_value('CO2-Emissions-Current'),
"heating-cost-current": self.get_node_value('Heating-Cost-Current'),
"heating-cost-potential": self.get_energy_assessment_value('Heating-Cost-Potential'),
"hot-water-cost-current": self.get_node_value('Hot-Water-Cost-Current'),
"hot-water-cost-potential": self.get_energy_assessment_value('Hot-Water-Cost-Potential'),
"lighting-cost-current": self.get_node_value('Lighting-Cost-Current'),
"energy-consumption-current": self.get_node_value('Energy-Consumption-Current'),
"lodgement-date": self.get_node_value('Inspection-Date'),
"lodgement-datetime":
datetime.strptime(self.get_node_value('Inspection-Date'), "%Y-%m-%d").isoformat(),
"mainheat-description": self.get_property_summary_value('Main-Heating', 'Description'),
"floor-height": floor_height,
"glazed-type": glazed_type,
}
def get_insulation_wall_area(self):
"""
Extracts the insulation wall area for the main dwelling
Note that this doesn't include any extensions. We don't have recommendations for extensions right now, so we
don't currently calculate the insulation wall area for them, since it's not used in the recommendations.
"""
main_dwelling_floors = [
f for f in self.floor_dimensions if f["building_part_identifier"] == "Main Dwelling" and not f["room_roof"]
]
main_dwelling_windows = [
w for w in self.windows if w["window_location"] == "0"
]
wall_areas = sum([float(f["heat_loss_perimeter"]) * float(f["room_height"]) for f in main_dwelling_floors])
window_areas = sum([float(w["window_area"]) for w in main_dwelling_windows])
return wall_areas - window_areas
def extract_additional_data(self):
self.insulation_wall_area = self.get_insulation_wall_area()
# We pull this out which is used as the insulation floor area
main_dwelling_ground_floor_area = [
f for f in self.floor_dimensions if f["building_part_identifier"] == "Main Dwelling" and f["floor"] == "0"
][0]["total_floor_area"]
main_dwelling_windows = [w for w in self.windows if w["window_location"] == "0"]
number_of_windows = len(main_dwelling_windows)
windows_area = sum([float(w["window_area"]) for w in main_dwelling_windows])
boolean_lookup = {
"true": True,
"false": False,
"Y": True,
"N": False
}
cylinder_insulation_type = {
"1": "Foam",
}
self.additional_data = {
"file_location": self.filekey,
"surveyor_name": self.surveyor_name,
"surveyor_company": self.surveyor_company,
"space_heating_kwh": self.space_heating_kwh,
"water_heating_kwh": self.water_heating_kwh,
# "heating_system": self.heating_system,
# "heating_controls": self.heating_controls,
"number_of_doors": self.number_of_doors,
"number_of_insulated_doors": self.number_of_insulated_doors,
"number_of_floors": self.number_of_floors,
"insulation_wall_area": self.insulation_wall_area,
"heat_loss_perimeter": self.heat_loss_perimeter,
"party_wall_length": self.party_wall_length,
"perimeter": self.perimeter,
"rooms_with_bath_and_or_shower": int(self.get_node_value('Rooms-With-Bath-And-Or-Shower')),
"rooms_with_mixer_shower_no_bath": int(self.get_node_value('Rooms-With-Mixer-Shower-No-Bath')),
"room_with_bath_and_mixer_shower": int(self.get_node_value('Rooms-With-Bath-And-Mixer-Shower')),
"percent_draftproofed": int(self.get_node_value('Percent-Draughtproofed')),
"has_hot_water_cylinder": boolean_lookup[self.get_node_value('Has-Hot-Water-Cylinder')],
"cylinder_insulation_type": cylinder_insulation_type[self.get_node_value('Cylinder-Insulation-Type')],
"cylinder_insulation_thickness": int(self.get_node_value('Cylinder-Insulation-Thickness')),
"cylinder_thermostat": boolean_lookup[self.get_node_value('Cylinder-Thermostat')],
"main_dwelling_ground_floor_area": float(main_dwelling_ground_floor_area),
"number_of_windows": int(number_of_windows),
"windows_area": float(windows_area),
}
def get_node_value(self, tag_name):
nodes = self.xml.getElementsByTagName(tag_name)
if nodes and nodes[0].firstChild:
return nodes[0].firstChild.nodeValue
return None
def get_node_value_from_floor_dimensions(self, tag_name):
nodes = self.xml.getElementsByTagName('SAP-Floor-Dimension')
if nodes:
tag = nodes[0].getElementsByTagName(tag_name)
if tag and tag[0].firstChild:
return tag[0].firstChild.nodeValue
return None
def get_property_summary_value(self, section, tag_name):
nodes = self.xml.getElementsByTagName('Property-Summary')[0].getElementsByTagName(section)
if nodes:
tag = nodes[0].getElementsByTagName(tag_name)
if tag and tag[0].firstChild:
return tag[0].firstChild.nodeValue
return None
def get_energy_assessment_value(self, tag_name):
nodes = self.xml.getElementsByTagName('Energy-Assessment')[0]
if nodes:
tag = nodes.getElementsByTagName(tag_name)
if tag and tag[0].firstChild:
return tag[0].firstChild.nodeValue
return None
def get_uprn(self, uprn):
if uprn is not None:
self.uprn = uprn
return
uprn_tag = self.xml.getElementsByTagName('UPRN')[0].firstChild
if uprn_tag is None:
self.uprn = -1
return
self.uprn = uprn_tag.nodeValue
# If all of the characters in the UPRN are 0, then there is not set UPRN
if self.uprn.count("0") == len(self.uprn):
self.uprn = 0
else:
self.uprn = self.uprn.lower().split("uprn-")[1]
def get_property_type(self):
if not self.xml:
raise ValueError("You need to read the file first")
property_type = self.xml.getElementsByTagName('Property-Type')
if not property_type:
property_type = self.xml.getElementsByTagName('PropertyType1')
return PROPERTY_TYPE_LOOKUP[property_type[0].firstChild.nodeValue]
def get_sap(self):
sap_score = self.xml.getElementsByTagName('Energy-Rating-Current')
sap_score = int(sap_score[0].firstChild.nodeValue)
epc_rating = sap_to_epc(sap_score)
return {
"current-energy-efficiency": str(sap_score),
"current-energy-rating": epc_rating
}
def get_heating_and_emissions_data(self):
"""
This method will extract the following pieces of information:
1) Space heating requirement
2) Water heating requirement
3) CO2 emissions
4) Heat demand per square meter per year
5) Bills
:return:
"""
self.space_heating_kwh = self.xml.getElementsByTagName(
'Space-Heating-Existing-Dwelling'
)[0].firstChild.nodeValue
self.water_heating_kwh = self.xml.getElementsByTagName('Water-Heating')[0].firstChild.nodeValue
def get_detailed_heating_specs(self):
"""
Given the heating data that is found in the <SAP-Heating> tag, we extract the detailed about the heating
system
:return:
"""
sap_main_heating_details = (
self.xml.getElementsByTagName('SAP-Heating')[0]
.getElementsByTagName("Main-Heating-Details")[0]
.getElementsByTagName("Main-Heating")[0]
)
heating_code = sap_main_heating_details.getElementsByTagName("Main-Heating-Number")[0].firstChild.nodeValue
# Get the heating system
heating_system = heating_data[heating_data["code"] == int(heating_code)]["description"]
heating_system = heating_system.values[0] if not heating_system.empty else f"Heating code: {heating_code}"
# Get the heating controls
heating_controls_code = (
sap_main_heating_details.getElementsByTagName("Main-Heating-Control")[0].firstChild.nodeValue
)
heating_controls = heating_data[heating_data["code"] == int(heating_controls_code)]["description"]
heating_controls = (
heating_controls.values[0] if not heating_controls.empty else f"Heating Controls code: {heating_code}"
)
self.heating_system = heating_system
self.heating_controls = heating_controls
def get_doors(self):
# Doors can be found in the SAP-Property-Details tag
self.number_of_doors = int(
self.xml.getElementsByTagName('SAP-Property-Details')[0]
.getElementsByTagName('Door-Count')[0]
.firstChild.nodeValue
)
self.number_of_insulated_doors = int(
self.xml.getElementsByTagName('SAP-Property-Details')[0]
.getElementsByTagName('Insulated-Door-Count')[0]
.firstChild.nodeValue
)
def get_photo_supply(self):
photo_supply_tag = self.xml.getElementsByTagName("Photovoltaic-Supply")[0]
# Check if the "None-Or-No-Details" tag is present
if photo_supply_tag.getElementsByTagName("None-Or-No-Details"):
return (
photo_supply_tag.
getElementsByTagName("None-Or-No-Details")[0].
getElementsByTagName("Percent-Roof-Area")[0].
firstChild.nodeValue
)
else:
raise NotImplementedError("Implement me")
def get_assessor_details(self):
energy_assessor_tag = self.xml.getElementsByTagName('Energy-Assessor')[0]
self.surveyor_name = (
energy_assessor_tag.getElementsByTagName("Name")[0].firstChild.nodeValue
)
def get_property_address(self):
property_tag = self.xml.getElementsByTagName("Property")[0]
address1 = self.get_node(property_tag.getElementsByTagName("Address-Line-1")[0])
address2 = self.get_node(property_tag.getElementsByTagName("Address-Line-2")[0])
address3 = self.get_node(property_tag.getElementsByTagName("Address-Line-3")[0])
posttown = self.get_node(property_tag.getElementsByTagName("Post-Town")[0])
postcode = self.get_node(property_tag.getElementsByTagName("Postcode")[0])
address = ", ".join(
[x for x in [address1, address2, address3] if x is not None]
)
county = property_tag.getElementsByTagName("County")
if county:
county = county[0].firstChild.nodeValue
else:
county = ""
# Seems to be unavailable in the xml
constituency = None
constituency_label = None
return {
"address1": address1,
"address2": address2,
"address3": address3,
"posttown": posttown,
"postcode": postcode,
"address": address,
"county": county,
"constituency": constituency,
"constituency-label": constituency_label
}
def get_floor_dimensions(self):
"""
Extracts physical measurements of the property such as the floor area, room height, etc.
across the main dwelling and any extensions.
:return:
"""
def get_part_value(node, tag_name):
element = node.getElementsByTagName(tag_name)
if element and element[0].firstChild:
return element[0].firstChild.nodeValue
return None
# Each part will correspond to the main
sap_building_parts = self.xml.getElementsByTagName("SAP-Building-Part")
floor_dimensions = []
for building_part in sap_building_parts:
building_part_identifier = building_part.getElementsByTagName("Identifier")[0].firstChild.nodeValue
sap_floor_dimensions = building_part.getElementsByTagName("SAP-Floor-Dimension")
data = [
{
'building_part_identifier': building_part_identifier,
'floor': get_part_value(floor_dimension, 'Floor'),
'floor_construction': get_part_value(floor_dimension, 'Floor-Construction'),
'floor_insulation': get_part_value(floor_dimension, 'Floor-Insulation'),
'heat_loss_perimeter': get_part_value(floor_dimension, 'Heat-Loss-Perimeter'),
'party_wall_length': get_part_value(floor_dimension, 'Party-Wall-Length'),
'total_floor_area': get_part_value(floor_dimension, 'Total-Floor-Area'),
'room_height': get_part_value(floor_dimension, 'Room-Height'),
"room_roof": False
} for floor_dimension in sap_floor_dimensions
]
room_roofs = building_part.getElementsByTagName("SAP-Room-In-Roof")
room_roof_data = [
{
"building_part_identifier": building_part_identifier,
"floor": str(max([int(d["floor"]) for d in data]) + 1),
"floor_construction": "",
"floor_insulation": rr.getElementsByTagName("Insulation")[0].firstChild.nodeValue,
"heat_loss_perimeter": "",
"party_wall_length": "",
"total_floor_area": rr.getElementsByTagName("Floor-Area")[0].firstChild.nodeValue,
"room_height": "",
"room_roof": True
} for rr in room_roofs
]
floor_dimensions.extend(data)
floor_dimensions.extend(room_roof_data)
self.floor_dimensions = floor_dimensions
self.number_of_floors = len(
[f for f in self.floor_dimensions if f["building_part_identifier"] == "Main Dwelling"]
)
# We extract the maximum heat loss perimeter, per building part
max_heat_loss_perimeters = {d['building_part_identifier']: max(
(float(x['heat_loss_perimeter']) for x in self.floor_dimensions if
x['building_part_identifier'] == d['building_part_identifier'] and x['heat_loss_perimeter']),
default=float('-inf')
) for d in self.floor_dimensions}
self.heat_loss_perimeter = sum(max_heat_loss_perimeters.values())
max_party_walls = {
d['building_part_identifier']: max(
(float(x['party_wall_length']) for x in self.floor_dimensions if
x['building_part_identifier'] == d['building_part_identifier'] and x['party_wall_length']),
default=float('-inf')
) for d in self.floor_dimensions
}
self.party_wall_length = sum(max_party_walls.values())
self.perimeter = self.heat_loss_perimeter + self.party_wall_length
def get_windows(self):
"""
Extracts data about the windows in the property, including the number of windows and the window type.
:return:
"""
sap_windows = self.xml.getElementsByTagName("SAP-Windows")[0].getElementsByTagName("SAP-Window")
glazing_type_lookup = {
"3": "double glazing, unknown install date"
}
orientation_lookup = {
"1": "North",
"2": "North East",
"3": "East",
"4": "South East",
"5": "South",
"6": "South West",
"7": "West",
"8": "North West"
}
self.windows = [
{
"window_location": window.getElementsByTagName("Window-Location")[0].firstChild.nodeValue,
"window_area": window.getElementsByTagName("Window-Area")[0].firstChild.nodeValue,
"window_type": window.getElementsByTagName("Window-Type")[0].firstChild.nodeValue,
"glazing_type": glazing_type_lookup[
window.getElementsByTagName("Glazing-Type")[0].firstChild.nodeValue
],
"pvc_frame": window.getElementsByTagName("PVC-Frame")[0].firstChild.nodeValue,
"glazing_gap": window.getElementsByTagName("Glazing-Gap")[0].firstChild.nodeValue,
"orientation": orientation_lookup[window.getElementsByTagName("Orientation")[0].firstChild.nodeValue]
} for window in sap_windows
]

View file

@ -1,3 +1,108 @@
from backend.app.db.functions.energy_assessment_functions import bulk_insert_energy_assessments
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder, save_csv_to_s3
from utils.logger import setup_logger
from etl.xml_survey_extraction.XmlParser import XmlParser
import os
import pandas as pd
from io import BytesIO
logger = setup_logger()
BUCKET = "retrofit-energy-assessments-dev"
USER_ID = 8
SCENARIOS = {
86: {
"project_code": "VDE001",
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"bodies": [
# Scenario A: Cavity wall insulation
{
"portfolio_id": str(86),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace", "solar_pv", "heating"],
"budget": None,
"scenario_name": "Low Hanging Fruit",
"multi_plan": True,
},
# Scenario B: CWI, Solar PV, AHSP
{
"portfolio_id": str(86),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace"],
"budget": None,
"scenario_name": "Deep Retrofit",
"multi_plan": True,
},
# Scenario C, CWI, floor insulation, PV, AHSP
{
"portfolio_id": str(86),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["fireplace"],
"budget": None,
"scenario_name": "Whole House Retrofit",
"multi_plan": True,
}
]
},
87: {
"project_code": "VDE002",
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"bodies": [
# Scenario A: Solar PV, AHSP
{
"portfolio_id": str(87),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace"],
"budget": None,
"scenario_name": "Deep Retrofit",
"multi_plan": True,
},
# Scenario B, floor insulation, PV, AHSP
{
"portfolio_id": str(87),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["fireplace"],
"budget": None,
"scenario_name": "Whole House Retrofit",
"multi_plan": True,
}
]
}
}
def main():
"""
This function executes the main process, which will retrieve data from the specified locations, extract the data
@ -6,4 +111,123 @@ def main():
"""
# TODO: Build solution to get this data from Onedrive and store what we need in S3
# In s3, we have a bucket called retrofit-energy-assessments-{stage} which
# In s3, we have a bucket called retrofit-energy-assessments-{stage} which contains the data we need
# The data is stored in a folder called {surveyors}/{project_code}/{uprn}
# We'll need to get the uprn from the folder name, which we can do with EpcSearcher class
# TODO: Pull out county, as in create_epc_records in the router, we pull it from the latest EPC, but we should
# be able to deduce it from just the address. Same for constituency and constituency_label
# TODO: Store the project code in the database
#
for scenario_config in SCENARIOS.values():
energy_assessments = list_files_and_subfolders_in_s3_folder(
bucket_name=BUCKET, folder_name=f"{scenario_config['surveyor']}/{scenario_config['project_code']}/"
)
logger.info(
f"Found {len(energy_assessments)} energy assessments for {scenario_config['surveyor']} and "
f"{scenario_config['project_code']}"
)
assessments_map = {}
for assessment in energy_assessments:
uploaded_xmls = list_xmls_in_s3_folder(
bucket_name=BUCKET, folder_name=os.path.join(assessment, "docs & plans")
)
uprn = int(assessment.rstrip("/").split("/")[-1])
assessments_map[uprn] = uploaded_xmls
logger.info(f"Exatracted XMLS for the energy assessments")
# TODO: IF we have many uploads, we can do them in a batch so we don't try and upload huge amounts of data to
# the database at onece
# TODO: We now have detailed information about primary and secondary walls, so we should use this information
# in our recommendations when we have it
# For example, for 77 Peryn Road, W3 7LT, the energy assessment has a main dwelling and two extensions,
# where
# the physical dimensions and the fabric of each building is constructed in a way as if each building is
# separate. We should use this information to make recommendations that are specific to each building
# part, though the problem here is that while the fabric and dimensions are separate, the actual SAP,
# CO2, etc
# figures span across the entire property.
# Idea: We can collect all of this information by building part and store it separately in the database
# against the uprn. We can have key data for the EPC, but then also additional data for each
# building
# part. We can then use this data to make recommendations that are specific to each building part
# We should probably re-think this data model, so we break up the data in a more considered fasion and
# produce
# the underlying EPC data as a summary of the building parts. Not only do we have data against the main
# dwelling and extensions, but we also have multiple windows with individiaul pieces of information that
# we can use to make recommendations. We should store this data in a way that we can easily access it and
# use it to make recommendations (e.g. we should have a Windows table)
# For each property, we download the xmls and extract the data
database_data = []
for uprn, xmls in assessments_map.items():
extracted_data = {}
for xml in xmls:
xml_data = read_from_s3(bucket_name=BUCKET, s3_file_name=xml)
xml_data_io = BytesIO(xml_data)
xml_parser = XmlParser(
file=xml_data_io,
filekey=os.path.join(f"s3://{BUCKET}", xml),
uprn=uprn,
surveyor_company=scenario_config["surveyor"],
)
xml_parser.run()
if xml_parser.is_lig:
logger.info(f"Extracted data from {xml}")
extracted_epc = xml_parser.epc
extracted_additional_data = xml_parser.additional_data
data_to_update = {
**extracted_epc, **extracted_additional_data
}
# We need to update the keys to match the database schema - i.e. we should replace all hyphens with
# underscores
data_to_update = {k.replace("-", "_"): v for k, v in data_to_update.items()}
extracted_data.update(data_to_update)
database_data.append(extracted_data)
logger.info("Uploading data to the database")
session = sessionmaker(bind=db_engine)()
bulk_insert_energy_assessments(session, database_data)
session.close()
# Create the asset list
asset_list = [
{"uprn": x["uprn"], "address": x["address1"], "postcode": x["postcode"]} for x in database_data
]
asset_list = pd.DataFrame(asset_list)
# Store the asset list in s3
filename = f"{USER_ID}/{scenario_config['bodies'][0]['portfolio_id']}/non_intrusives.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
for body in scenario_config["bodies"]:
body["trigger_file_path"] = filename
print(body)
# TODO: In order to get the full data associated to the heating system, we need to download and parse the pcdb which
# can be found here: https://www.ncm-pcdb.org.uk/pcdb/pcdb10.dat
# https://www.ncm-pcdb.org.uk/sap/download
# However retrieving this data is not a priority, so we can leave this for now as parsing the database
# is a non-trivial task
# TODO: The condition report contains additional data such as the number of bedrooms and the number of bathrooms
# We can extract this data and store it in the database as well. We can then update our kwargs methodology
# that is passed to the property class, where instead we store this additional data in our database (it could
# be stored in the energy assessment table, or in a separate table) and then when we're passed additional data
# we can query the database for this data and use it to update the property object, instead of storing it
# in the asset list and pulling it out of the asset list
# 1) Bathrooms
# 2) Bedrooms

File diff suppressed because it is too large Load diff

View file

@ -50,5 +50,8 @@ class FireplaceRecommendations(Definitions):
# Take a very basic estimate of 6 hours, multipled by the number of open fireplaces to seal
"labour_hours": 6 * number_open_fireplaces,
"labour_days": 6 * number_open_fireplaces / 8, # Assume 8 hour day
"description_simulation": {
"number-open-fireplaces": 0
}
}
]

View file

@ -116,7 +116,7 @@ class HeatingRecommender:
# In the future, we'll allow overrides, so that non-intrusive surveys can contradict these conditions
# and either allow or prevent the recommendation of an air source heat pump
if self.is_ashp_valid(exclusions=exclusions):
if self.property.is_ashp_valid(exclusions=exclusions):
self.recommend_air_source_heat_pump(
phase=phase, has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations
)
@ -186,19 +186,6 @@ class HeatingRecommender:
description = ("Replace the existing boiler and cylinder without a thermostat with a new electric combi "
"boiler")
def is_ashp_valid(self, exclusions):
if "air_source_heat_pump" in self.property.non_invasive_recommendations:
return True
if "air_source_heat_pump" in exclusions:
return False
suitable_property_type = self.property.data["property-type"] in ["House", "Bungalow"]
has_air_source_heat_pump = self.property.main_heating["has_air_source_heat_pump"]
return suitable_property_type and not has_air_source_heat_pump
def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations, _return=False):
"""
This method will implement the recommendation for an air source heat pump

View file

@ -87,6 +87,25 @@ class RoofRecommendations:
return (self.insulation_thickness > self.MINIMUM_LOFT_ISULATION_MM) and self.property.roof["is_pitched"]
def is_room_roof_insulated(self):
"""
Check if the room roof is already insulated
"""
full_insulated_room_roof = (
self.property.roof["is_roof_room"] and
self.property.roof["insulation_thickness"] in ["average", "above_average"]
)
room_roof_insulated_at_rafters = (
self.property.roof["is_pitched"] and
self.property.roof["is_at_rafters"] and
self.property.roof["insulation_thickness"] in ["average", "above_average"]
)
return full_insulated_room_roof or room_roof_insulated_at_rafters
def recommend(self, phase):
if self.property.roof["has_dwelling_above"]:
@ -105,8 +124,8 @@ class RoofRecommendations:
if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]:
return
if self.property.roof["is_roof_room"]:
raise ValueError("Update convert_thickness_to_numeric for room roof and implement")
if self.is_room_roof_insulated():
return
# If we have a u-value already, need to implement this
if u_value:
@ -118,7 +137,17 @@ class RoofRecommendations:
return
raise NotImplementedError("Implement me")
u_value = get_roof_u_value(**{**self.property.roof, "age_band": self.property.age_band})
u_value = get_roof_u_value(
insulation_thickness=self.property.roof["insulation_thickness"],
has_dwelling_above=self.property.roof["has_dwelling_above"],
is_loft=self.property.roof["is_loft"],
is_roof_room=self.property.roof["is_roof_room"],
is_thatched=self.property.roof["is_thatched"],
age_band=self.property.age_band,
is_flat=self.property.roof["is_flat"],
is_pitched=self.property.roof["is_pitched"],
is_at_rafters=self.property.roof["is_at_rafters"],
)
self.estimated_u_value = u_value
if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and (

View file

@ -78,23 +78,6 @@ class SolarPvRecommendations:
}
]
def is_solar_pv_valid(self):
# If the property is a flat but we are looking at building solar potential, we can include this
if (self.property.building_id is not None) and (self.property.solar_panel_configuration is not None):
return True
is_valid_property_type = self.property.data["property-type"] in ["House", "Bungalow", "Maisonette"]
is_valid_roof_type = (
self.property.roof["is_flat"] or self.property.roof["is_pitched"] or self.property.roof["is_roof_room"]
)
# If there is no existing solar PV, the photo-supply field will be None or a missing value
has_no_existing_solar_pv = self.property.data["photo-supply"] in [
None, 0, self.property.DATA_ANOMALY_MATCHES
]
return is_valid_property_type and is_valid_roof_type and has_no_existing_solar_pv
def recommend_building_analysis(self, phase):
"""
This recommendation approach handles the case of producing solar PV recommendations at the building level,
@ -117,7 +100,7 @@ class SolarPvRecommendations:
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / total_roof_area * 100)
# Spread the cost to the individual units - adding a 20% contingency
total_cost = recommendation_config["total_cost"] / n_units
kw = np.floor(recommendation_config["array_warrage"] / 100) / 10
kw = np.floor(recommendation_config["array_wattage"] / 100) / 10
# Default to a weeks work for a team of 3 people doing 8 hour days
labour_days = 5
labour_hours = 3 * 8 * labour_days
@ -159,7 +142,7 @@ class SolarPvRecommendations:
:return:
"""
if not self.is_solar_pv_valid():
if not self.property.is_solar_pv_valid():
return
# If we have a buiilding level analysis, we implement separate logic
@ -167,84 +150,47 @@ class SolarPvRecommendations:
self.recommend_building_analysis(phase)
return
solar_pv_percentage = self.property.solar_pv_percentage
# We round up to the neaest 10%
solar_pv_percentage = np.ceil(solar_pv_percentage * 10) / 10
panel_performance = self.property.solar_panel_configuration["panel_performance"]
roof_area = self.property.roof_area
# For the solar recommendations, we produce the following scenarios:
# 1) Solar panels only, we present a high, medium and low coverage
# 2) With and without battery
roof_coverage_scenarios = [
solar_pv_percentage - 0.1, solar_pv_percentage,
]
if solar_pv_percentage <= 0.4:
roof_coverage_scenarios.append(solar_pv_percentage + 0.1)
# We make sure we haven't gone too low or high - we allow no more than 60% coverage
roof_coverage_scenarios = [v for v in roof_coverage_scenarios if 0 <= v <= 0.6]
# If we only have two scenarios, we add a coverage scenario 10% less than the smallest
if len(roof_coverage_scenarios) == 2:
roof_coverage_scenarios.insert(0, roof_coverage_scenarios[0] - 0.1)
battery_scenarios = [False, True]
solar_configurations = panel_performance.head(3).reset_index(drop=True)
scenarios_with_wattage = []
for roof_coverage in roof_coverage_scenarios:
# We now have a property which is potentially suitable for solar PV
solar_pv_roof_area = self.property.get_solar_pv_roof_area(roof_coverage)
# We combine each of these configurations with estimates with and without a battery
for rank, recommendation_config in solar_configurations.iterrows():
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / roof_area * 100)
for has_battery in [False, True]:
cost_result = self.costs.solar_pv(
wattage=recommendation_config["array_wattage"], has_battery=has_battery
)
kw = np.floor(recommendation_config["array_wattage"] / 100) / 10
if has_battery:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
f"{round(roof_coverage_percent)}% the roof, with a battery storage system.")
else:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p"
f"anel system on {round(roof_coverage_percent)}% the roof.")
number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
already_installed = "solar_pv" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
if solar_panel_wattage < self.MIN_SYSTEM_WATTAGE:
continue
solar_panel_wattage = np.clip(
a=solar_panel_wattage, a_min=self.MIN_SYSTEM_WATTAGE, a_max=self.MAX_SYSTEM_WATTAGE
)
scenarios_with_wattage.append((roof_coverage, solar_panel_wattage))
# We trim the scenarios, so that we don't have duplicate wattages
scenarios_with_wattage = self.trim_solar_wattage_options(scenarios_with_wattage)
# Produce the cross product of the scenarios
scenarios = [
(roof, wattage, battery) for roof, wattage in scenarios_with_wattage for battery in battery_scenarios
]
# We deduce the wattage of the solar panels based on the roof coverage
for roof_coverage, solar_panel_wattage, has_battery in scenarios:
# We now have a property which is potentially suitable for solar PV
roof_coverage_percent = round(roof_coverage * 100)
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=has_battery)
kw = np.floor(solar_panel_wattage / 100) / 10
if has_battery:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on "
f"{round(roof_coverage_percent)}% the roof, with a battery storage system.")
else:
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p"
f"anel system on {round(roof_coverage_percent)}% the roof.")
already_installed = "solar_pv" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
self.recommendation.append(
{
"phase": phase,
"parts": [],
"type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": 100 * roof_coverage,
"has_battery": has_battery,
"description_simulation": {"photo-supply": 100 * roof_coverage},
}
)
self.recommendation.append(
{
"phase": phase,
"parts": [],
"type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we
# scale
# back up here
"photo_supply": roof_coverage_percent,
"has_battery": has_battery,
"initial_ac_kwh_per_year": recommendation_config["initial_ac_kwh_per_year"],
"description_simulation": {"photo-supply": roof_coverage_percent},
}
)

View file

@ -48,6 +48,7 @@ class WindowsRecommendations:
is_secondary_glazing = self.property.restricted_measures or (
self.property.windows["glazing_type"] == "secondary"
)
windows_area = self.property.windows_area
if not number_of_windows:
raise ValueError("Number of windows not specified")
@ -57,6 +58,9 @@ class WindowsRecommendations:
):
return
if windows_area is not None:
raise Exception("We have windows area, we should use this data for our recommendations!!!")
# We scale the number of windows based on the proportion of existing glazing
if self.property.data["multi-glaze-proportion"] != "":
n_windows_scalar = 1 - (

View file

@ -9,7 +9,7 @@ def prepare_input_measures(property_recommendations, goal):
"""
goal_map = {
"Increase EPC": "sap_points"
"Increasing EPC": "sap_points"
}
goal_key = goal_map[goal]

View file

@ -205,10 +205,22 @@ def get_wall_u_value(
return float(mapped_value)
def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched):
def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched, is_at_rafters):
"""Get the U-value from table S9 based on the insulation thickness."""
# If the roof as pitched & insulated at the rafters, it's a room roof
if is_roof_room or is_at_rafters:
# We re-map the thickness
thickness_map = {
"below average": "50",
"average": "100",
"above average": "270",
"none": "0",
}
thickness = thickness_map[thickness]
if thickness in ["below average", "average", "above average", "none", None] or (
not is_loft and not is_roof_room
not is_loft and not is_roof_room and not is_at_rafters
):
return None
elif thickness.endswith("+"):
@ -280,6 +292,7 @@ def get_roof_u_value(
is_loft=is_loft,
is_roof_room=is_roof_room,
is_thatched=is_thatched,
is_at_rafters=is_at_rafters
)
if u_value is not None:
@ -676,7 +689,7 @@ def estimate_windows(
property_type, built_form, construction_age_band, floor_area, number_habitable_rooms
):
# If there is an extension, that will boost the number of habitable rooms
# Base window count based on habitable rooms
window_count = number_habitable_rooms

View file

@ -276,3 +276,86 @@ def list_files_in_s3_folder(bucket_name, folder_name):
except Exception as e:
logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []
def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name):
"""
List all files and immediate subfolders in a given folder in an S3 bucket.
E.g. if we have a folder structure in S3 like this:
- folder1/
- file1.csv
- file2.csv
- subfolder1/
- file3.csv
Then calling list_files_and_subfolders_in_s3_folder(bucket_name='my-bucket', folder_name='folder1/')
would return ['folder1/file1.csv', 'folder1/file2.csv', 'folder1/subfolder1/'].
Namely, the nested files are not included in the list, only the immediate files and subfolders.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of file keys and subfolder prefixes in the specified S3 folder.
"""
# For this function, folder_name should end with a forward slash
if not folder_name.endswith('/'):
folder_name += '/'
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name, Delimiter='/')
items = []
# Add files to the list
if 'Contents' in response:
items.extend([content['Key'] for content in response['Contents'] if content['Key'] != folder_name])
# Add immediate subfolders to the list
if 'CommonPrefixes' in response:
items.extend([prefix['Prefix'] for prefix in response['CommonPrefixes']])
return items
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []
def list_xmls_in_s3_folder(bucket_name, folder_name):
"""
List all XML files in a given folder in an S3 bucket.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of XML file keys in the specified S3 folder.
"""
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if 'Contents' not in response:
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
return []
# Filter XML files
xml_files = [content['Key'] for content in response['Contents'] if content['Key'].endswith('.xml')]
return xml_files
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []