Merge pull request #134 from Hestia-Homes/main

Completed the recommendations api with the optimiser and portfolio aggregations
This commit is contained in:
KhalimCK 2023-08-21 19:47:30 +01:00 committed by GitHub
commit f076cb3fb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 1830 additions and 760 deletions

View file

@ -2,10 +2,10 @@ from datetime import datetime
import re
from epc_api.client import EpcClient
from model_data.config import EPC_AUTH_TOKEN
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
class Property(BaseUtility):
class Property(Definitions):
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",
@ -51,6 +51,8 @@ class Property(BaseUtility):
self.heat_loss_corridor = None
self.mains_gas = None
self.floor_height = None
self.insulation_wall_area = None
self.floor_area = None
if epc_client:
self.epc_client = epc_client
@ -241,6 +243,8 @@ class Property(BaseUtility):
self.set_heat_loss_corridor()
self.set_mains_gas()
self.set_floor_height()
self.set_wall_area()
self.set_floor_area()
for description, attribute in cleaned.items():
@ -424,3 +428,22 @@ class Property(BaseUtility):
}
return property_details_epc
def set_wall_area(self):
"""
This method is placeholder
It implements our floor area model to produce an estimate of the property's insulatable wall area
"""
import random
self.insulation_wall_area = random.uniform(60, 100)
def set_floor_area(self):
"""
Sets the floor area based on the EPC data
"""
# We don't know the number of floors at the moment so we're going to assume 1
# however this is something we'll need to use Verisk data for
self.floor_area = float(self.data["total-floor-area"])

View file

@ -0,0 +1,12 @@
from backend.app.db.models.materials import Material
def get_materials(session):
"""
This function will retrieve all materials from the database.
:return: A list of Material objects if successful, an empty list otherwise.
"""
materials = session.query(Material).filter(Material.is_active).all()
return materials if materials else []

View file

@ -0,0 +1,35 @@
from sqlalchemy import func
from backend.app.db.models.recommendations import Plan, PlanRecommendations, Recommendation
from backend.app.db.models.portfolio import Portfolio
def aggregate_portfolio_recommendations(session, portfolio_id: int):
# Aggregate multiple fields
aggregates = (
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
# For future usage we will aggregate multiple fields in this step
# func.sum(Recommendation.heat_demand).label("total_heat_demand"),
# func.sum(Recommendation.energy_savings).label("total_energy_savings")
)
.join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id)
.join(Plan, Plan.id == PlanRecommendations.plan_id)
.filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True)
.one()
)
aggregates_dict = {
"cost": aggregates.cost or 0,
# "total_heat_demand": aggregates.total_heat_demand or 0,
# "total_energy_savings": aggregates.total_energy_savings or 0
}
# Get the portfolio and update the fields
portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one()
# Update the data
for key, value in aggregates_dict.items():
setattr(portfolio, key, value)
# Merge the updated portfolio back into the session
session.merge(portfolio)
session.flush()

View file

@ -3,120 +3,128 @@
###
import datetime
import pytz
from sqlalchemy.orm import sessionmaker
from backend.app.db.models.portfolio import (
PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel
)
from backend.app.db.connection import db_engine
from sqlalchemy.orm.exc import NoResultFound
def create_property(portfolio_id: int, address: str, postcode: str) -> (int, bool):
def create_property(session, portfolio_id: int, address: str, postcode: str) -> (int, bool):
"""
This function will create a record for the property in the database if it does not exist.
If it does exist, it will just update the updated_at field.
:param session: The database session
:param portfolio_id: The ID of the portfolio the property belongs to
:param address: The address of the property
:param postcode: The postcode of the property
:return: The ID of the property and a boolean indicating whether it was created or not
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
address=address, postcode=postcode, portfolio_id=portfolio_id
).one()
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
address=address, postcode=postcode, portfolio_id=portfolio_id
).one()
# Update the 'updated_at' field
existing_property.updated_at = datetime.datetime.now(pytz.utc)
# Update the 'updated_at' field
existing_property.updated_at = datetime.datetime.now(pytz.utc)
# Merge the updated property back into the session
session.merge(existing_property)
session.commit()
# Merge the updated property back into the session
session.merge(existing_property)
session.flush()
return existing_property.id, False
return existing_property.id, False
except NoResultFound:
# Property doesn't exist, create a new one
new_property = PropertyModel(
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,
has_recommendations=False
)
except NoResultFound:
# Property doesn't exist, create a new one
new_property = PropertyModel(
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,
has_recommendations=False
)
# Add the new property to the session
session.add(new_property)
# Add the new property to the session
session.add(new_property)
session.commit()
session.flush()
return new_property.id, True
return new_property.id, True
def create_property_targets(property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None):
def create_property_targets(session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None):
"""
This function will create a record for the property targets in the database if it does not exist.
:param session: The database session
:param property_id: The ID of the property the targets belong to
:param portfolio_id: The ID of the portfolio the property belongs to
:param epc_target: Goal EPC value for the property
:param heat_demand_target: Heat demand target for the property in kwh/m^2/year
:return:
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
new_target = PropertyTargetsModel(
property_id=property_id,
portfolio_id=portfolio_id,
epc=epc_target,
heat_demand=heat_demand_target
)
session.add(new_target)
session.commit()
new_target = PropertyTargetsModel(
property_id=property_id,
portfolio_id=portfolio_id,
epc=epc_target,
heat_demand=heat_demand_target
)
session.add(new_target)
session.flush()
return True
def update_property_data(property_id: int, portfolio_id: int, property_data: dict):
Session = sessionmaker(bind=db_engine)
def update_property_data(session, property_id: int, portfolio_id: int, property_data: dict):
now = datetime.datetime.now(pytz.utc)
with Session() as session:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
id=property_id, portfolio_id=portfolio_id
).one()
# Update the fields with the data in property_data
for key, value in property_data.items():
setattr(existing_property, key, value)
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
id=property_id, portfolio_id=portfolio_id
).one()
existing_property.updated_at = now
# Update the fields with the data in property_data
for key, value in property_data.items():
setattr(existing_property, key, value)
# Merge the updated property back into the session and commit
session.merge(existing_property)
session.commit()
existing_property.updated_at = now
except NoResultFound:
raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found")
# Merge the updated property back into the session and flush
session.merge(existing_property)
session.flush()
except NoResultFound:
raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found")
return True
def create_property_details_epc(property_details_epc: dict):
def create_property_details_epc(session, property_details_epc: dict):
"""
This function will create a record for the property details EPC in the database.
This function will create or update a record for the property details EPC in the database.
:param session: The database session
:param property_details_epc: A dictionary containing details about the property EPC.
:return: True if successful, False otherwise.
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
existing_record = session.query(PropertyDetailsEpcModel).filter_by(
portfolio_id=property_details_epc["portfolio_id"],
property_id=property_details_epc["property_id"]
).first()
if existing_record:
# If the record exists, update its fields
for key, value in property_details_epc.items():
setattr(existing_record, key, value)
else:
# If the record doesn't exist, create a new one
new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc)
session.add(new_property_details_epc)
session.commit()
session.flush()
return True

View file

@ -0,0 +1,112 @@
from sqlalchemy import insert
from backend.app.db.models.recommendations import Plan, Recommendation, RecommendationMaterials, PlanRecommendations
def create_plan(session, plan):
"""
This function will create a record for the plan in the database if it does not exist.
:param plan: dictionary of data representing a plan to be created
"""
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
return new_plan.id
def create_recommendation(session, recommendation):
"""
This function will create a record for the recommendation in the database if it does not exist.
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
return new_recommendation.id
def create_recommendation_material(session, recommendation_id, material_id, depth):
"""
This function will create a record for the recommendation_material in the database if it does not exist.
:param session: The databse session
:param recommendation_id: ID of the recommendation
:param material_id: ID of the material
:param depth: depth of the material, may be null if a material where depth is not applicable
"""
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id,
material_id=material_id,
depth=depth
)
session.add(new_recommendation_material)
session.flush()
return new_recommendation_material.id
def create_plan_recommendations(session, plan_id, recommendation_ids):
"""
This function will create records for the plan_recommendation in the database.
:param plan_id: ID of the plan
:param recommendation_ids: list of recommendation IDs
"""
# Prepare a list of dictionaries for bulk insert
data = [{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids]
# Bulk insert using SQLAlchemy's core API
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(session, recommendations_to_upload, property_id):
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["cost"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"]
}
for rec in recommendations_to_upload
]
session.bulk_insert_mappings(Recommendation, recommendations_data)
# To get the IDs of the newly inserted recommendations, we need to flush the session
session.flush()
# Map the uploaded_recommendation_ids with the original data for reference
uploaded_recommendation_ids = [rec.id for rec in session.query(Recommendation).filter(
Recommendation.property_id == property_id,
Recommendation.description.in_([rec["description"] for rec in recommendations_to_upload])
)]
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": part["depths"][0] if part["depths"] else None,
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
# flush the changes to get the newly created IDs
session.flush()
return uploaded_recommendation_ids

View file

@ -0,0 +1,52 @@
import enum
from sqlalchemy import Column, Integer, String, Float, Enum, TIMESTAMP, Boolean
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class MaterialType(enum.Enum):
suspended_floor_insulation = "suspended_floor_insulation"
solid_floor_insulation = "solid_floor_insulation"
external_wall_insulation = "external_wall_insulation"
internal_wall_insulation = "internal_wall_insulation"
class DepthUnit(enum.Enum):
mm = "mm"
class CostUnit(enum.Enum):
gbp_sq_meter = "gbp_sq_meter"
class RValueUnit(enum.Enum):
square_meter_kelvin_per_watt = "square_meter_kelvin_per_watt"
class ThermalConductivityUnit(enum.Enum):
watt_per_meter_kelvin = "watt_per_meter_kelvin"
class Material(Base):
__tablename__ = 'material'
id = Column(Integer, primary_key=True, autoincrement=True)
type = Column(Enum(MaterialType, values_callable=lambda x: [e.value for e in x]), nullable=False)
description = Column(String, nullable=False)
depths = Column(String) # You may want to use a specific JSON type depending on the database
depth_unit = Column(Enum(DepthUnit, values_callable=lambda x: [e.value for e in x]), nullable=False)
cost = Column(String)
cost_unit = Column(Enum(CostUnit, values_callable=lambda x: [e.value for e in x]), nullable=False)
r_value_per_mm = Column(Float)
r_value_unit = Column(Enum(RValueUnit, values_callable=lambda x: [e.value for e in x]), nullable=False)
thermal_conductivity = Column(Float)
thermal_conductivity_unit = Column(
Enum(ThermalConductivityUnit, values_callable=lambda x: [e.value for e in x]),
nullable=False
)
link = Column(String)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
is_active = Column(Boolean, nullable=False, default=True)

View file

@ -0,0 +1,61 @@
from sqlalchemy import Column, BigInteger, String, Float, Boolean, TIMESTAMP, ForeignKey, Enum
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func
from backend.app.db.models.portfolio import Portfolio, PropertyModel
from backend.app.db.models.materials import Material
from datatypes.enums import QuantityUnits
Base = declarative_base()
class Recommendation(Base):
__tablename__ = 'recommendation'
id = Column(BigInteger, primary_key=True, autoincrement=True)
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
type = Column(String, nullable=False)
description = Column(String, nullable=False)
estimated_cost = Column(Float)
default = Column(Boolean, nullable=False)
starting_u_value = Column(Float)
new_u_value = Column(Float)
sap_points = Column(Float)
heat_demand = Column(Float)
co2_equivalent_savings = Column(Float)
energy_savings = Column(Float)
energy_cost_savings = Column(Float)
property_valuation_increase = Column(Float)
rental_yield_increase = Column(Float)
total_work_hours = Column(Float)
class RecommendationMaterials(Base):
__tablename__ = 'recommendation_materials'
id = Column(BigInteger, primary_key=True, autoincrement=True)
recommendation_id = Column(BigInteger, ForeignKey('recommendation.id'), nullable=False)
material_id = Column(BigInteger, ForeignKey(Material.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
depth = Column(Float, nullable=False)
quantity = Column(Float, nullable=False)
quantity_unit = Column(Enum(QuantityUnits, values_callable=lambda x: [e.value for e in x]), nullable=False)
estimated_cost = Column(Float, nullable=False)
class Plan(Base):
__tablename__ = 'plan'
id = Column(BigInteger, primary_key=True, autoincrement=True)
portfolio_id = Column(BigInteger, ForeignKey(Portfolio.id), nullable=False)
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
is_default = Column(Boolean, nullable=False)
class PlanRecommendations(Base):
__tablename__ = 'plan_recommendations'
id = Column(BigInteger, primary_key=True, autoincrement=True)
plan_id = Column(BigInteger, ForeignKey('plan.id'), nullable=False)
recommendation_id = Column(BigInteger, ForeignKey('recommendation.id'), nullable=False)

18
backend/app/db/utils.py Normal file
View file

@ -0,0 +1,18 @@
import enum
def row2dict(row):
"""
Generic function to convert a SQLAlchemy row to a dictionary.
May not be the best practice implementing like this but works for the moment
"""
d = {}
for column in row.__table__.columns:
val = getattr(row, column.name)
if isinstance(val, enum.Enum):
val = val.value
d[column.name] = val
return d

View file

@ -11,17 +11,32 @@ from utils.logger import setup_logger
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from utils.uvalue_estimates import classify_decile_newvalues
from backend.app.db.utils import row2dict
from starlette.responses import Response
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
# database interaction functions
from backend.app.db.functions.property_functions import (
create_property, create_property_targets, update_property_data, create_property_details_epc
)
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.recommendations_functions import (
create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations,
upload_recommendations
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from model_data.utils import epc_to_sap_lower_bound
from model_data.optimiser.optimiser_functions import prepare_input_measures
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
from backend.app.plan.uvalue_estimates_floors import uvalue_estimates_floors
from backend.app.plan.temp_cleaned_data import cleaned
from backend.app.plan.temp_materials_db import materials
logger = setup_logger()
@ -81,10 +96,11 @@ lighting_averages = [
]
def get_materials(materials):
def filter_materials(materials):
materials_by_type = defaultdict(list)
for material in materials:
material = row2dict(material)
material_type = material["type"]
materials_by_type[material_type].append(material)
@ -94,148 +110,287 @@ def get_materials(materials):
return materials_by_type
def insert_temp_recommendation_id(property_recommendations):
"""
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
idx = 0
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = idx
idx += 1
return property_recommendations
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
# Read in the trigger file from s3
bucket_name = get_settings().PLAN_TRIGGER_BUCKET
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
logger.info("Connecting to db")
Session = sessionmaker(bind=db_engine)
session = Session()
plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path)
try:
session.begin()
logger.info("Getting the inputs")
# Read in the trigger file from s3
bucket_name = get_settings().PLAN_TRIGGER_BUCKET
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path)
# Create a record in db
property_id, is_new = create_property(
portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
# Create a record in db
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
)
logger.info("Getting EPC data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
logger.info("Getting coordinates")
# This is placeholder, until the full dataset is loaded into the database
for p in input_properties:
coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0]
p.set_coordinates(coordinate_data)
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
logger.info("Check if property is in conservation area")
for p in input_properties:
in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get(
"is_in_conservation_area"
)
p.set_is_in_conservation_area(in_conservation_area)
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
)
)
# The materials data could be cached or local so we don't need to make
# consistent requrests to the backend for
# the same data
materials_by_type = get_materials(materials)
if not input_properties:
return Response(status_code=204)
logger.info("Getting components and properties recommendations")
recommendations = []
for property_id, p in enumerate(input_properties):
# For each property, classiy floor area decide
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=floors_decile_data["decile_boundaries"],
decile_labels=floors_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
logger.info("Getting EPC data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
# Property recommendations
p.get_components(cleaned)
logger.info("Getting coordinates")
# This is placeholder, until the full dataset is loaded into the database
for p in input_properties:
coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0]
p.set_coordinates(coordinate_data)
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data["floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
logger.info("Check if property is in conservation area")
for p in input_properties:
in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get(
"is_in_conservation_area"
)
p.set_is_in_conservation_area(in_conservation_area)
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p, uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile
)
floor_recommender.recommend()
# insert property id
for rec in floor_recommender.recommendations:
rec["property_id"] = property_id
# The materials data could be cached or local so we don't need to make
# consistent requrests to the backend for
# the same data
# TODO: It might not be the best choice to store the materials data in a database table since thi
# table probably won't be very large and won't be updated that often. It might be better to
# store this data in s3 load it into memory when the app starts up. We will test this
recommendations.extend(floor_recommender.recommendations)
materials = get_materials(session)
materials_by_type = filter_materials(materials)
# Wall recommendations
# We would make this u-value query directly to the database
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=walls_decile_data["decile_boundaries"],
decile_labels=walls_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
logger.info("Getting components and properties recommendations")
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
walls_u_value_estimate = [
x for x in uvalue_estimates_walls
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data["walls-energy-eff"] != 'N/A' else True) &
(x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True)
]
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
for p in input_properties:
property_recommendations = []
wall_recomendations = WallRecommendations(
property_instance=p,
uvalue_estimates=walls_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"]
)
wall_recomendations.recommend()
# insert property id
for rec in wall_recomendations.recommendations:
rec["property_id"] = property_id
# For each property, classiy floor area decide
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=floors_decile_data["decile_boundaries"],
decile_labels=floors_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
recommendations.extend(wall_recomendations.recommendations)
# Property recommendations
p.get_components(cleaned)
# Once we're done, we'll store:
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data[
"floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id, rating_lookup=rating_lookup)
create_property_details_epc(property_details_epc)
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"],
)
floor_recommender.recommend()
property_data = p.get_full_property_data()
update_property_data(property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
return {"recommendations": recommendations}
# Wall recommendations
# We would make this u-value query directly to the database
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=walls_decile_data["decile_boundaries"],
decile_labels=walls_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
walls_u_value_estimate = [
x for x in uvalue_estimates_walls
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data[
"walls-energy-eff"] != 'N/A' else True) &
(x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True)
]
wall_recomender = WallRecommendations(
property_instance=p,
uvalue_estimates=walls_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"]
)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# Use the optimiser to pick the default recommendations and decide if we need certain
# recommendations to get to the goal
property_recommendations = insert_temp_recommendation_id(property_recommendations)
if not property_recommendations:
continue
input_measures = prepare_input_measures(property_recommendations, body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(p.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures, min_gain=target_sap_points - current_sap_points
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
selected_recommendations = {r["id"] for r in solution}
# We'll use the set of selected recommendations to filter the recommendations to upload
property_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in property_recommendations
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
property_recommendations = [
rec for recommendations_by_type in property_recommendations for rec in recommendations_by_type
]
recommendations[p.id] = property_recommendations
# Once we're done, we'll store:
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id,
rating_lookup=rating_lookup)
create_property_details_epc(session, property_details_epc)
property_data = p.get_full_property_data()
update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Create a plan
new_plan_id = create_plan(
session,
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
# Upload recommendations
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
# Finally, match the recommendation to the plan
create_plan_recommendations(
session,
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id)
# Commit all changes at once
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)

View file

@ -1,242 +0,0 @@
suspended_floor_insulation_parts = [
{
# Example product
# All product types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/recticel/filterby/application/floors.html
"id": 1,
"type": "suspended_floor_insulation",
"description": "Rigid Insulation Foam Boards",
"depths": [25, 30, 40, 50, 60, 70, 75, 80, 90, 100, 110, 120, 130, 140, 150],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.04545454545454546,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.022,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationsuperstore.co.uk/product/recticel-eurothane-general-purpose-pir-insulation"
"-board-2400-x-1200-x-100mm.html"
},
{
# All product types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/rockwool/filterby/application/floors
# /material/mineral-wool.html
"id": 2,
"type": "suspended_floor_insulation",
"description": "Mineral Wool Floor Insulation",
"depths": [25, 40, 50, 60, 75, 100],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.02857142857142857,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.035,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationsuperstore.co.uk/product/rockwool-rwa45-acoustic-insulation-slab-100mm-2-88m2"
"-pack.html"
},
]
solid_floor_insulation_parts = [
{
# All product types here:
# https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation?brand=7015&p=1
# Example screed https://www.screwfix.com/p/mapei-ultraplan-3240-self-levelling-compound-25kg/4959f
"id": 3,
"type": "solid_floor_insulation",
"description": "Rigid Insulation Foam Boards with floor screed",
"depths": [25, 50, 70, 75, 100],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.04545454545454546,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.052631578947368425,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation/k103-100mm"
},
]
external_wall_insulation_parts = [
{
"id": 4,
"type": "external_wall_insulation",
"description": "Mineral Wool External Wall Insulation",
"depths": [30, 50, 70, 80, 90, 100, 150, 200],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.0278,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.036,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://insulationgo.co.uk/100mm-rockwool-external-wall-insulation-dual-density-slabs-a1-non"
"-combustible-slab-ewi-render-fire/"
},
{
"id": 5,
"type": "external_wall_insulation",
"description": "Expanded Polystyrene External Wall Insulation",
"depths": [25, 50, 100, 125],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.02703,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.037,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationking.co.uk/products/polystyrene-eps70?variant=44156186558759"
},
{
"id": 6,
"type": "external_wall_insulation",
"description": "Phenolic Foam External Wall Insulation",
"depths": [20, 50, 100],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.043478260869565216,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.023,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationshop.co/20mm_kooltherm_k5_external_wall_kingspan.html"
},
{
"id": 7,
"type": "external_wall_insulation",
"description": "Polyisocyanurate/Polyurethane Foam External Wall Insulation",
"depths": [],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": None,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": None,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": None
},
{
"id": 8,
"type": "external_wall_insulation",
"description": "Wood Fiber External Wall Insulation",
"depths": [40, 60],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.023255813953488375,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.043,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.mikewye.co.uk/product/steico-duo-dry/"
},
{
"id": 9,
"type": "external_wall_insulation",
"description": "Aerogel External Wall Insulation",
"depths": [10, 20, 30, 40, 50, 60, 70],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.06666666666666667,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.015,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.thermablok.co.uk/site/wp-content/uploads/2022/09/Thermablok-Aerogel-Insulation-Blanket"
"-TDS-AIS-and-Steel-Related-Details.pdf"
},
{
"id": 10,
"type": "external_wall_insulation",
"description": "Vacuum Insulation Panels External Wall Insulation",
"depths": [45, 60],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.16666666666666666,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.006,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": None
}
]
internal_wall_insulation_parts = [
{
"id": 11,
"type": "internal_wall_insulation",
"description": "Rigid Insulation Boards Internal Wall Insulation",
"depths": [25, 40, 50, 75, 100],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.026315789473684213,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.038,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationshop.co/25mm_polystyrene_insulation_eps_70jablite.html"
},
{
"id": 12,
"type": "internal_wall_insulation",
"description": "Mineral Wool Internal Wall Insulation",
"depths": [140],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.02857142857142857,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.035,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.rockwool.com/siteassets/rw-uk/downloads/datasheets/flexi.pdf"
},
{
"id": 13,
"type": "internal_wall_insulation",
"description": "Insulated Plasterboard Internal Wall Insulation",
"depths": [25, 80],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.02857142857142857,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.019,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.kingspan.com/gb/en/products/insulation-boards/wall-insulation-boards/kooltherm-k118"
"-insulated-plasterboard/"
},
{
"id": 14,
"type": "internal_wall_insulation",
"description": "Reflective Internal Wall Insulation",
"depths": [],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": None,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": None,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": None
},
{
"id": 15,
"type": "internal_wall_insulation",
"description": "Vacuum Insulation Panels Wall Insulation",
"depths": [20, 30],
"depth_unit": "mm",
"cost": None,
"cost_unit": None,
"r_value_per_mm": 0.125,
"r_value_unit": "square_meter_kelvin_per_watt",
"thermal_conductivity": 0.008,
"thermal_conductivity_unit": "watt_per_meter_kelvin",
"link": "https://www.insulationsuperstore.co.uk/product/vacutherm-vacupor-nt-b2-vacuum-insulated-panel-1m-x"
"-600mm-x-30mm.html"
},
]
materials = (
suspended_floor_insulation_parts + solid_floor_insulation_parts + external_wall_insulation_parts + \
internal_wall_insulation_parts
)

5
datatypes/enums.py Normal file
View file

@ -0,0 +1,5 @@
import enum
class QuantityUnits(enum.Enum):
m2 = "m2"

View file

@ -1,4 +1,4 @@
class BaseUtility:
class Definitions:
"""
This class contains some base attributes which are used across multiple other classes
"""
@ -38,7 +38,7 @@ class BaseUtility:
# addresses will take time to develop to deal with these and future anomalies.
#
# There are several fields within the lodged data where it is possible to enter multiple entries to cater for
# different types of build within a single property, i.e. extensions. This results in multiple entries for
# different data_types of build within a single property, i.e. extensions. This results in multiple entries for
# the description fields for floor, roof and wall. For the purposes of this data release only the information
# contained within the first of these multiple entries is being provided. As there are no restrictions on the
# value in this first field it means that sometimes the first field in a multiple entry description field may

View file

@ -22,7 +22,7 @@ LAND_REGISTRY_PATHS = [
def app():
"""
For a pre-defined list of constituencies and property types, we'll download EPC data from the API
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
sanitise any description data
:return:

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_thermal_transmittance, extract_component_types
class FloorAttributes(BaseUtility):
class FloorAttributes(Definitions):
DWELLING_BELOW = ["another dwelling below", "other premises below"]
FLOOR_TYPES = ["assumed", "to unheated space", "to external air", "suspended", "solid"]

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, find_keyword
class HotWaterAttributes(BaseUtility):
class HotWaterAttributes(Definitions):
# HEATER_TYPES refer to the main devices used for heating water. These devices can be powered by different energy
# sources.
HEATER_TYPES = [

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, remove_punctuation, find_keyword
class MainFuelAttributes(BaseUtility):
class MainFuelAttributes(Definitions):
FUEL_KEYWORDS = [
'heat network',
'mains gas',
@ -96,7 +96,7 @@ class MainFuelAttributes(BaseUtility):
if not result["fuel_type"]:
result["fuel_type"] = self.UNKNOWN_FUEL
# We'll do checks on unknown fuel types to ensure we don't miss anything
# We'll do checks on unknown fuel data_types to ensure we don't miss anything
self.is_unknown = True
return result

View file

@ -1,9 +1,9 @@
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, process_part
from typing import Dict, Union
class MainHeatAttributes(BaseUtility):
class MainHeatAttributes(Definitions):
HEAT_SYSTEMS = [
"boiler", "air source heat pump", "room heaters", "electric storage heaters", "warm air",
"electric underfloor heating", "electric ceiling heating", "community scheme",

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, find_keyword
class MainheatControlAttributes(BaseUtility):
class MainheatControlAttributes(Definitions):
# These systems allow for the automatic regulation of temperature
THERMOSTATIC_CONTROL_KEYWORDS = [
'room thermostats',

View file

@ -1,10 +1,10 @@
import re
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_component_types, extract_thermal_transmittance
class RoofAttributes(BaseUtility):
class RoofAttributes(Definitions):
ROOF_TYPES = ['pitched', 'roof room', 'loft', 'flat', 'thatched', 'at rafters', 'assumed']
DWELLING_ABOVE = ["another dwelling above", "other premises above"]

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_component_types, extract_thermal_transmittance
class WallAttributes(BaseUtility):
class WallAttributes(Definitions):
WALL_TYPES = ['cavity wall', 'filled cavity', 'solid brick', 'system built', 'timber frame', 'granite or whinstone',
'as built', 'cob', 'assumed', 'sandstone or limestone']

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description
class WindowAttributes(BaseUtility):
class WindowAttributes(Definitions):
GLAZING_KEYWORDS = ["glazing", "glazed", "glaze"]
GLAZING_COVERAGE = ["fully", "mostly", "partial", "some", "full", "thoughout"]
GLAZING_TYPES = ["double", "triple", "secondary", "multiple", "high performance", "single"]

View file

@ -36,13 +36,13 @@ def extract_component_types(result: dict, description: str, list_of_components:
Dict[str, Union[None, str, float]], str
]:
"""
Extracts component types from the description, updates the result dictionary, and removes the matched component
types from the description.
Extracts component data_types from the description, updates the result dictionary, and removes the matched component
data_types from the description.
:param result: Dictionary to store the results in.
:param description: Lowercase description string.
:param list_of_components: List of component types to extract from the description.
:return: A tuple containing the updated result dictionary and the description with the matched component types
:param list_of_components: List of component data_types to extract from the description.
:return: A tuple containing the updated result dictionary and the description with the matched component data_types
removed.
"""
for component in list_of_components:

View file

@ -0,0 +1,68 @@
from mip import Model, xsum, minimize, BINARY
class CostOptimiser:
"""
This class is used to minimise cost, given a constrained minimum gain
"""
def __init__(self, components, min_gain):
self.components = components
self.min_gain = min_gain
self.m = None
self.variables = []
self.solution = []
self.solution_cost = None
self.solution_gain = None
def setup(self):
# Initialize Model
self.m = Model("knapsack")
# Create variables
self.variables = [
[self.m.add_var(var_type=BINARY, name=str(component["id"])) for component in group] for group in
self.components
]
# Set objective
# This objective is to minimize
# cost_ig * x_ig, where cost_ig represents the cost for ith part in group g
# and x_ig is the binary decision variable for the ith part in group g
self.m.objective = minimize(
xsum(
component['cost'] * var for group, group_vars in zip(self.components, self.variables) for component, var
in
zip(group, group_vars)
)
)
# Add constraints
# This constrain ensures that sum of gain_ig * x_ig >= min_gain, where gain_ig represents the gain for the ith
# component
# in group g, and x_ig is the binary decision variable for the ith component in group g
self.m += xsum(
item['gain'] * var for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
) >= self.min_gain
# At most one item from each group
# This constraint ensures that at most one item from each group is selected
# This is expressed by summing up the decision variables for each group and ensuring that the sum is <= 1
for group_vars in self.variables:
self.m += xsum(var for var in group_vars) <= 1
def solve(self):
# Solve the problem
self.m.optimize()
self.solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in zip(group, group_vars)
if
var.x >= 0.99
]
# Get the selected items
self.solution_cost = self.m.objective.x
self.solution_gain = sum([component['gain'] for component in self.solution])

View file

@ -0,0 +1,70 @@
from mip import Model, xsum, maximize, BINARY
class GainOptimiser:
"""
This class is used maximise gain, given a constrained cost
"""
def __init__(self, components, max_cost):
self.components = components
self.max_cost = max_cost
self.m = None
self.variables = []
self.solution = []
self.solution_gain = None
self.solution_cost = None
def setup(self):
# Initialize Model
self.m = Model("knapsack")
# Create variables
self.variables = [
[self.m.add_var(var_type=BINARY, name=str(component["id"])) for component in group] for group in
self.components
]
# Set objective
# This objective is the sum
# gain_ig * x_ig, where gain_ig represents the gain for ith part in group g
# and x_ig is the binary decision variable for the ith part in group g
self.m.objective = maximize(
xsum(
component['gain'] * var for group, group_vars in zip(self.components, self.variables) for component, var
in
zip(group, group_vars)
)
)
# Add constraints
# This constrain ensures that sum of cost_ig * x_ig <= C, where cost_ig represents the cost for the ith
# component
# in group g, and x_ig is the binary decision variable for the ith component in group g
self.m += xsum(
item['cost'] * var for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
) <= self.max_cost
# At most one item from each group
# This constraint ensures that at most one item from each group is selected
# This is expressed by summing up the decision variables for each group and ensuring that the sum is <= 1
for group_vars in self.variables:
self.m += xsum(var for var in group_vars) <= 1
def solve(self):
# Solve the problem
self.m.optimize()
self.solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in zip(group, group_vars)
if
var.x >= 0.99
]
# Get the selected items
self.solution_gain = self.m.objective.x
self.solution_cost = sum([component['cost'] for component in self.solution])

View file

@ -1,200 +0,0 @@
from mip import Model, xsum, maximize, BINARY
from pprint import pprint
# Example parts
wall = [
{"id": 1, "cost": 2000, "gain": 5, "type": "wall"},
{"id": 2, "cost": 2300, "gain": 6, "type": "wall"}
]
floor = [
{"id": 1, "cost": 1500, "gain": 3, "type": "floor"},
{"id": 2, "cost": 1600, "gain": 3.1, "type": "floor"}
]
roof = [
{"id": 1, "cost": 1000, "gain": 2, "type": "roof"},
{"id": 2, "cost": 1100, "gain": 2.3, "type": "roof"}
]
# To solve this, we are solving a constrained Knapsack problem
# Maximize sum(gain_g . x_g) for g in groups
# subject to sum(cost_g . x_g) <= C
# subject to sum(x_g) <= 1 for g in groups
# x_g in {0, 1} for g in groups
#
# The first sum, which is the objective of the optimisation provlem, ensures that we are maximising the gain
# for the selected parts
# The second sum (and the first constraint) ensures that the cost of the selected parts is less than or equal to C
# The third sum (and the second constraint) ensures that at most one part from each group is selected
# The last constraint ensures that the decision variables are binary
# group all the parts
components = [wall, floor, roof]
class GainOptimiser:
"""
This class is used maximise gain, given a constrained cost
"""
def __init__(self, components, max_cost):
self.components = components
self.max_cost = max_cost
self.m = None
self.variables = []
self.solution = []
self.solution_gain = None
self.solution_cost = None
def setup(self):
# Initialize Model
self.m = Model("knapsack")
# Create variables
self.variables = [
[self.m.add_var(var_type=BINARY, name=str(component["id"])) for component in group] for group in
self.components
]
# Set objective
# This objective is the sum
# gain_ig * x_ig, where gain_ig represents the gain for ith part in group g
# and x_ig is the binary decision variable for the ith part in group g
self.m.objective = maximize(
xsum(
component['gain'] * var for group, group_vars in zip(self.components, self.variables) for component, var
in
zip(group, group_vars)
)
)
# Add constraints
# This constrain ensures that sum of cost_ig * x_ig <= C, where cost_ig represents the cost for the ith
# component
# in group g, and x_ig is the binary decision variable for the ith component in group g
self.m += xsum(
item['cost'] * var for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
) <= self.max_cost
# At most one item from each group
# This constraint ensures that at most one item from each group is selected
# This is expressed by summing up the decision variables for each group and ensuring that the sum is <= 1
for group_vars in self.variables:
self.m += xsum(var for var in group_vars) <= 1
def solve(self):
# Solve the problem
self.m.optimize()
self.solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in zip(group, group_vars)
if
var.x >= 0.99
]
# Get the selected items
self.solution_gain = self.m.objective.x
self.solution_cost = sum([component['cost'] for component in self.solution])
opt = GainOptimiser(components, max_cost=4000)
# Setup the knackpack problem
# This sets the objective & contraints
opt.setup()
# Solve the problem
opt.solve()
pprint(opt.solution)
print("total cost:", opt.solution_cost)
print("total gain:", opt.solution_gain)
# A bigger problem:
wall = [
{"id": 1, "cost": 2000, "gain": 5, "type": "wall"},
{"id": 2, "cost": 2300, "gain": 6, "type": "wall"},
{"id": 3, "cost": 2200, "gain": 5.5, "type": "wall"},
{"id": 4, "cost": 2500, "gain": 6.2, "type": "wall"},
{"id": 5, "cost": 2100, "gain": 5.1, "type": "wall"},
{"id": 6, "cost": 2400, "gain": 6.1, "type": "wall"},
{"id": 7, "cost": 2000, "gain": 5.2, "type": "wall"}
]
floor = [
{"id": 1, "cost": 1500, "gain": 3, "type": "floor"},
{"id": 2, "cost": 1600, "gain": 3.1, "type": "floor"},
{"id": 3, "cost": 1550, "gain": 3.2, "type": "floor"},
{"id": 4, "cost": 1650, "gain": 3.3, "type": "floor"},
{"id": 5, "cost": 1500, "gain": 3.4, "type": "floor"},
{"id": 6, "cost": 1550, "gain": 3.5, "type": "floor"},
{"id": 7, "cost": 1600, "gain": 3.6, "type": "floor"}
]
roof = [
{"id": 1, "cost": 1000, "gain": 2, "type": "roof"},
{"id": 2, "cost": 1100, "gain": 2.3, "type": "roof"},
{"id": 3, "cost": 1200, "gain": 2.6, "type": "roof"},
{"id": 4, "cost": 1300, "gain": 2.9, "type": "roof"},
{"id": 5, "cost": 1100, "gain": 2.5, "type": "roof"},
{"id": 6, "cost": 1200, "gain": 2.7, "type": "roof"},
{"id": 7, "cost": 1300, "gain": 2.8, "type": "roof"}
]
heating = [
{"id": 1, "cost": 3000, "gain": 7, "type": "heating"},
{"id": 2, "cost": 3200, "gain": 7.2, "type": "heating"},
{"id": 3, "cost": 3100, "gain": 7.1, "type": "heating"},
{"id": 4, "cost": 3300, "gain": 7.3, "type": "heating"},
{"id": 5, "cost": 3000, "gain": 7.4, "type": "heating"}
]
hot_water = [
{"id": 1, "cost": 2500, "gain": 6.5, "type": "hot water"},
{"id": 2, "cost": 2600, "gain": 6.6, "type": "hot water"},
{"id": 3, "cost": 2500, "gain": 6.7, "type": "hot water"},
{"id": 4, "cost": 2700, "gain": 6.8, "type": "hot water"},
{"id": 5, "cost": 2500, "gain": 6.9, "type": "hot water"}
]
solar = [
{"id": 1, "cost": 5000, "gain": 10, "type": "solar"},
{"id": 2, "cost": 5500, "gain": 11, "type": "solar"},
{"id": 3, "cost": 5300, "gain": 10.5, "type": "solar"},
{"id": 4, "cost": 5200, "gain": 10.2, "type": "solar"},
{"id": 5, "cost": 5400, "gain": 10.8, "type": "solar"}
]
heat_pumps = [
{"id": 1, "cost": 4000, "gain": 9, "type": "heat pumps"},
{"id": 2, "cost": 4200, "gain": 9.2, "type": "heat pumps"},
{"id": 3, "cost": 4100, "gain": 9.1, "type": "heat pumps"},
{"id": 4, "cost": 4300, "gain": 9.3, "type": "heat pumps"},
{"id": 5, "cost": 4000, "gain": 9.4, "type": "heat pumps"}
]
components2 = [
wall,
floor,
roof,
heating,
hot_water,
solar,
heat_pumps
]
opt2 = GainOptimiser(components2, max_cost=15000)
# Setup
opt2.setup()
# Solve the problem
opt2.solve()
pprint(opt2.solution)
print("total cost:", opt2.solution_cost)
print("total gain:", opt2.solution_gain)

View file

@ -0,0 +1,33 @@
def prepare_input_measures(property_recommendations, goal):
"""
Basic function to convert recommendations_to_upload to a format that is
suitable for the optimiser - large
:param property_recommendations: object containing the recommendations, created in the plan trigger api
:param goal: goal to be optimised for, should be one of the keys in gain_map. E.g. if the gain is SAP points,
the goal should reflect that desired gain
:return: Nested list of input measures
"""
goal_map = {
"Increase EPC": "sap_points"
}
goal_key = goal_map[goal]
if not goal_key:
raise NotImplementedError("Not implemented this gain type - investigate me")
input_measures = []
for recs in property_recommendations:
input_measures.append(
[
{
"id": rec["recommendation_id"],
"cost": rec["cost"],
"gain": rec[goal_key],
"type": rec["type"]
}
for rec in recs
]
)
return input_measures

View file

@ -0,0 +1,200 @@
from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import Definitions
from simulation_system.Settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,
AVERAGE_FIXED_FEATURES,
FLOOR_HEIGHT_NATIONAL_AVERAGE,
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON
)
from typing import List
class DataProcessor:
"""
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path) -> None:
self.filepath = filepath
def load_data(self, low_memory=False) -> None:
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory'])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings'])
self.clean_multi_glaze_proportion()
self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count'])
self.remap_columns()
if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
return self.data
def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON):
"""
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
"""
# Each uprn can fille backward from recent and forward fill from oldest
# The groupby changes the order and we use the index to make the original data
filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply(
lambda group: group.fillna(method='bfill').fillna(method='ffill')
).reset_index().set_index('level_1').sort_index()
self.data[columns_to_fill] = filled_data[columns_to_fill]
def remap_columns(self):
"""
Remap all columns, for any non values
"""
# Map all anomaly values to None
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP)
self.data = data
def make_cleaning_averages(self) -> pd.DataFrame:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True,
dropna=False
).apply(median_without_missing).reset_index()
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply(
median_without_missing).reset_index()
built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
# We can clean up any NA's in the cleaning averages with the general averages here
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'],
suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'],
suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'],
suffixes=['', '_BUILT_FORM_AVERAGE'])
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
# If there still is na values, use average across all properties in consituecy
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT'].mean())
# If the consituency is all NA values, then take UK AVERAGE VALUES
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
'''
Reduce the data futher by keeping only datasets with multiple epcs
'''
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
def recast_df_columns(self, column_mappings: dict) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
for key, values in column_mappings.items():
if key not in self.data.columns:
print('Column mapping incorrectly specified')
exit(1)
for value in values:
self.data[key] = self.data[key].astype(value)
def confine_data(self) -> None:
"""
Include all step to reduce down the data based on assumptions
"""
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
# before the introduction of SAP09
# Filter 3: We remove EPCS that were conducted for a new build, since these are performed with
# full SAP, which produces different results to the RdSAP methodology
# Filter 4: We remove floor level in top floor or mid floor since this is ambiguous
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (
self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100

View file

@ -0,0 +1,22 @@
import logging
def setup_logger():
# Create a logger
logger = logging.getLogger()
# Set the log level
logger.setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Create a stream handler to direct logs to stdout
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
# Add the stream handler to the logger
logger.addHandler(stream_handler)
return logger
logger = setup_logger()

View file

@ -0,0 +1,123 @@
# Using a simply python file as settings for now
# TODO: migrate to dynaconf
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE = 70
FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45
COLUMNS_TO_MERGE_ON = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS"
]
FULLY_GLAZED_DESCRIPTIONS = [
"Fully double glazed",
"High performance glazing",
"Fully triple glazed",
"Full secondary glazing",
"Multiple glazing throughout",
]
FIXED_FEATURES = [
'PROPERTY_TYPE',
'BUILT_FORM',
'CONSTRUCTION_AGE_BAND',
'NUMBER_HABITABLE_ROOMS',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'FLOOR_LEVEL',
'TOTAL_FLOOR_AREA',
]
COMPONENT_FEATURES = [
'TRANSACTION_TYPE',
'WALLS_DESCRIPTION',
'FLOOR_DESCRIPTION',
'LIGHTING_DESCRIPTION',
'ROOF_DESCRIPTION',
'MAINHEAT_DESCRIPTION',
'HOTWATER_DESCRIPTION',
'MAIN_FUEL',
'MECHANICAL_VENTILATION',
'SECONDHEAT_DESCRIPTION',
'ENERGY_TARIFF', # Not sure if this is relevant
'SOLAR_WATER_HEATING_FLAG',
'PHOTO_SUPPLY',
'WINDOWS_DESCRIPTION',
'GLAZED_TYPE',
'MULTI_GLAZE_PROPORTION',
'LIGHTING_DESCRIPTION',
'LOW_ENERGY_LIGHTING',
'NUMBER_OPEN_FIREPLACES',
'MAINHEATCONT_DESCRIPTION',
'EXTENSION_COUNT',
# 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION
]
# For these fields, we take an average if we have multiple values
AVERAGE_FIXED_FEATURES = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT"
]
# For these fields, we take the latest value if we have multiple values
# Since more recent EPCs have been conducted with more rigour, we assume that the latest value is
# the most accurate
LATEST_FIELD = [
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
"FLOOR_LEVEL",
"CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for
]
# If we see thee features changing, we don't use the EPC, since deem it not to be reliable
MANDATORY_FIXED_FEATURES = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTITUENCY"
]
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
# and Wales from 31 July 2014
EARLIEST_EPC_DATE = "2014-08-01"
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
def ordinal(n):
if 10 <= n % 100 <= 20:
suffix = 'th'
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th')
return str(n) + suffix
FLOOR_LEVEL_MAP = {
"Basement": -1,
"Ground": 0,
"ground floor": 0,
"20+": 20,
"21st or above": 21,
**{str(i).zfill(2): i for i in range(0, 21)},
**{ordinal(i): i for i in range(-1, 21)},
**{str(i): i for i in range(-1, 21)},
**{i: i for i in range(-1, 21)},
}
BUILT_FORM_REMAP = {
"Enclosed End-Terrace": "End-Terrace",
"Enclosed Mid-Terrace": "Mid-Terrace",
}
DATA_PROCESSOR_SETTINGS = {
'low_memory': False,
'epc_minimum_count': 1,
'column_mappings': {'UPRN': [int, str]}
}

View file

@ -1,108 +1,142 @@
import numpy as np
import os
import pandas as pd
from tqdm import tqdm
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from pathlib import Path
from model_data.simulation_system.Settings import (
MANDATORY_FIXED_FEATURES,
AVERAGE_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP
)
from DataProcessor import DataProcessor
def list_subdirectories(directory_path):
return [d for d in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, d))]
DATA_DIRECTORY = os.getcwd() + '/model_data/simulation_system/data/all-domestic-certificates'
FIXED_FEATURES = [
'PROPERTY_TYPE',
'BUILT_FORM',
'CONSTRUCTION_AGE_BAND',
'NUMBER_HABITABLE_ROOMS',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'FIXED_LIGHTING_OUTLETS_COUNT',
'GLAZED_AREA',
'FLOOR_HEIGHT',
'FLOOR_LEVEL',
'TOTAL_FLOOR_AREA',
]
COMPONENT_FEATURES = [
'TRANSACTION_TYPE',
'WALLS_DESCRIPTION',
'FLOOR_DESCRIPTION',
'LIGHTING_DESCRIPTION',
'ROOF_DESCRIPTION',
'MAINHEAT_DESCRIPTION',
'HOTWATER_DESCRIPTION',
'MAIN_FUEL',
'MECHANICAL_VENTILATION',
'SECONDHEAT_DESCRIPTION',
'ENERGY_TARIFF', # Not sure if this is relevant
'SOLAR_WATER_HEATING_FLAG',
'PHOTO_SUPPLY',
'WINDOWS_DESCRIPTION',
'GLAZED_TYPE',
'MULTI_GLAZE_PROPORTION',
'LIGHTING_DESCRIPTION',
'LOW_ENERGY_LIGHTING',
'NUMBER_OPEN_FIREPLACES',
'MAINHEATCONT_DESCRIPTION',
'EXTENSION_COUNT'
]
AVERAGE_FIXED_FEATURES = [
"TOTAL_FLOOR_AREA"
]
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
def app():
# Get all the files in the directory
directories = list_subdirectories(DATA_DIRECTORY)
# Data glossary:
# https://epc.opendatacommunities.org/docs/guidance#glossary
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
# 116
# 128048706
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic
# -certificates/domestic-E09000021-Kingston-upon-Thames')
for directory in tqdm(directories):
filepath = os.path.join(DATA_DIRECTORY, directory, "certificates.csv")
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
counts = df.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
counts = counts.sort_values("count", ascending=False)
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > 1]
df = df[df["UPRN"].isin(counts["UPRN"])]
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
filepath = directory / "certificates.csv"
for uprn, property_data in df.groupby("UPRN"):
data_processor = DataProcessor(filepath=filepath)
df = data_processor.pre_process()
cleaning_averages = data_processor.make_cleaning_averages()
for uprn, property_data in df.groupby("UPRN", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
for field in FIXED_FEATURES:
vals = property_data[field].dropna().unique()
# Remove invalid values
vals = [v for v in vals if v not in BaseUtility.DATA_ANOMALY_MATCHES]
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1:
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
# Taking just the last row, which is the percentage change from the latest to previous one only
# property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1
# Extract the columns that are not all None
na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all()
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[
['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean()
modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on,
suffixes=['', '_AVERAGE'])
modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna(
modified_property_data['TOTAL_FLOOR_AREA_AVERAGE'])
modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna(
modified_property_data['FLOOR_HEIGHT_AVERAGE'])
modified_property_data = modified_property_data.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
for field in AVERAGE_FIXED_FEATURES:
vals = list(modified_property_data[field].dropna().unique())
if len(vals) > 1:
raise ValueError("Fixed feature {} has more than one value - fix me".format(field))
if field in AVERAGE_FIXED_FEATURES:
# Check the values are too far apart
# TODO: we could have multiple values here, why only use the first two?
if abs(vals[0] - vals[1]) / vals[0] > 0.1:
raise ValueError("Large deviation in fixed feature {} - fix me".format(field))
# Take the more recent value since it's likely to be more accurate
vals = [vals[-1]]
field_value = np.mean(vals)
else:
field_value = vals[0] if vals else None
if len(vals) == 0:
wrong_var
fixed_data[field] = field_value
fixed_data[field] = np.mean(vals)
variable_data = property_data[COMPONENT_FEATURES]
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
for idx in range(0, property_data.shape[0] - 1):
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = modified_property_data[
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
if idx >= property_data.shape[0] - 1:
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
for idx in range(0, modified_property_data.shape[0] - 1):
if idx >= modified_property_data.shape[0] - 1:
break
starting_record = variable_data.iloc[idx]
ending_record = variable_data.iloc[idx + 1]
rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE]
# TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and
# floors, we may want to use the U-value. We may also want to handle the (assumed) tags
# within descriptions
starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
features = pd.concat([starting_record, ending_record])
property_model_data.append(
{
"UPRN": uprn,
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
**fixed_data,
**features.to_dict()
}
)
dataset.extend(property_model_data)
output = pd.DataFrame(dataset)
output.to_parquet('./dataset.parquet')
if __name__ == "__main__":
app()

View file

@ -0,0 +1,118 @@
from pathlib import Path
from Settings import (
RDSAP_RESPONSE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,
FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES
)
from model_data.BaseUtility import Definitions
from tqdm import tqdm
import pandas as pd
import numpy as np
from autogluon.tabular import TabularDataset, TabularPredictor
RANDOM_SEED = 0
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
FLOAT_COLUMNS = [
'NUMBER_OPEN_FIREPLACES',
'EXTENSION_COUNT',
'TOTAL_FLOOR_AREA',
'PHOTO_SUPPLY',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'NUMBER_HABITABLE_ROOMS',
'LOW_ENERGY_LIGHTING',
'MULTI_GLAZE_PROPORTION',
'NUMBER_HEATED_ROOMS'
]
def create_raw_data():
"""
Extract all information to do a simple predictor for RDSAP
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[0:10]
dfs = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
# Remove any bad uprns and ignore old/bad data
df = df[~pd.isnull(df["UPRN"])]
df = df[df["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
df = df[df["TRANSACTION_TYPE"] != "new dwelling"]
df = df[~df["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
# Change multi glaze proportion
no_multi_glaze_proportion_index = pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (
df["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
df.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
# Recast
df["UPRN"] = df["UPRN"].astype(int).astype(str)
df['MAIN_HEATING_CONTROLS'] = df['MAIN_HEATING_CONTROLS'].astype(float)
# Sort Data
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Map all anomaly values to None
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
df = df.replace(data_anomaly_map)
df = df.replace(np.NAN, None)
# Remap certain columns
df['FLOOR_LEVEL'] = df['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
df['BUILT_FROM'] = df['BUILT_FORM'].replace(BUILT_FORM_REMAP)
# Keep only possible modelling columns
df = df[[RDSAP_RESPONSE] + list(set(FIXED_FEATURES + LATEST_FIELD + COMPONENT_FEATURES))]
# Reduce memory usage
# df.memory_usage()
# df.dtypes
df[RDSAP_RESPONSE] = pd.to_numeric(df[RDSAP_RESPONSE], downcast='unsigned')
df[FLOAT_COLUMNS] = df[FLOAT_COLUMNS].apply(pd.to_numeric, downcast='float')
dfs.append(df)
data = pd.concat(dfs)
data.to_parquet('./energy_predictor_data.parquet')
cleaned_data = data.dropna()
# GIves you primarily flats
cleaned_data.to_parquet('./energy_predictor_cleaned_data.parquet')
def main():
data = TabularDataset(data='./model_build_data/energy_data/cleaned_data/train_validation_data.parquet')
subsample_size = round(len(data) / 100)
data = data.sample(subsample_size, random_state=RANDOM_SEED)
predictor_RDSAP = TabularPredictor(
label=RDSAP_RESPONSE,
path="agModels-predictENERGY",
problem_type="regression",
eval_metric='mean_absolute_error'
).fit(data, time_limit=800, presets='high_quality', excluded_model_types=['KNN', 'CAT'])
test_data = TabularDataset('./model_build_data/energy_data/cleaned_data/test_data.parquet')
performance = predictor_RDSAP.evaluate(test_data)
predictions = predictor_RDSAP.predict(test_data)
predictor_RDSAP.feature_importance(test_data)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,77 @@
from Logger import logger
import argparse
import pandas as pd
from pathlib import Path
RANDOM_SEED = 0
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser.add_argument('--filepath', type=str, help='Location of Parquet dataset to load', required=True)
parser.add_argument('--output-folder', type=str, help='Location of Parquet dataset to save', required=True)
parser.add_argument('--percentage', type=float, help='Percentage of data to use as test data', default=None)
parser.add_argument('--volume', type=int, help='Volume of data to use as test data', default=None)
parser.add_argument('--sampling', type=str, help='Type of sampling to do for test data', choices=['random', 'stratified'], default='random')
args = parser.parse_args()
return args
def main(filepath: str, output_folder: str, percentage: float, volume: int, sampling: str):
"""
Load a dataset in and split out the training+validation data and the test data.
"""
logger.info('---Loading Data---')
data = pd.read_parquet(filepath).reset_index(drop=True)
if percentage and volume is None:
test_amount = round(len(data)*percentage)
elif percentage is None and volume:
test_amount = volume
elif percentage is None and volume is None:
logger.error('No amount specified - please specify either a percentage or volume')
exit(1)
else:
logger.info('Both percentage and volume specified - taking largest of the two')
test_amount = max(round(len(data)*percentage), volume)
logger.info(f'---Extracting {test_amount} from dataset to be test data')
if sampling == 'random':
logger.info('--- Using random sample method ---')
sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index
train_validation_data = data.drop(sample_index)
test_data = data.iloc[sample_index]
elif sampling =='stratified':
# Not yet implemented
pass
logger.info('--- Saving data ---')
train_validation_data.to_parquet(Path(output_folder)/'train_validation_data.parquet')
test_data.to_parquet(Path(output_folder)/'test_data.parquet')
logger.info(' ---Pipeline complete---')
if __name__ == "__main__":
logger.info('--- Generate test data pipeline ---')
args = ingest_arguments()
main(
filepath=args.filepath,
output_folder=args.output_folder,
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling
)

View file

@ -0,0 +1,143 @@
import os
import pandas as pd
import argparse
from typing import List
from Logger import logger
from autogluon.tabular import TabularDataset, TabularPredictor
DROP_COLUMNS = ['UPRN', 'HEAT_DEMAND_CHANGE']
FEATURE_COLUMNS = None
RANDOM_SEED = 0
# FOR TESTING
train_filepath = "./model_build_data/train_validation_data.parquet"
test_filepath = "./model_build_data/test_data.parquet"
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training')
parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing')
args = parser.parse_args()
return args
class DataLoader():
@staticmethod
def load(filepath: str) -> pd.DataFrame:
"""
Load different datasets
"""
if filepath.endswith('.parquet'):
df = pd.read_parquet(filepath)
elif filepath.endswith('.csv.'):
df = pd.read_csv(filepath)
else:
logger.error('Not implemented!')
exit(1)
return df
class FeatureProcessor:
"""
Handle all feature manipulation before modelling
"""
@staticmethod
def drop_columns(df: pd.DataFrame, drop_columns: str = DROP_COLUMNS) -> pd.DataFrame:
df = df.drop(columns=[drop_columns])
return df
def retain_features(df: pd.DataFrame, features: List[str] = None):
"""
Determine which columns to keep ofr modelling
"""
if features is None:
features = df.columns
else:
if not set(features).issubset(df.columns):
logger.error('Features defined is not contained in data')
exit(1)
df = df[features]
return df
def process(self, df: pd.DataFrame) -> pd.DataFrame:
df = self.drop_columns(df, drop_columns=DROP_COLUMNS)
df = self.retain_features(df, features=FEATURE_COLUMNS)
return df
def training(train_filepath: str, test_filepath: str) -> None:
"""
Pipeline to run training on the dataset
"""
logger.info('Loading data')
dataloader = DataLoader()
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
# df = pd.read_parquet(train_filepath).drop(columns=['HEAT_DEMAND_CHANGE'])
logger.info('Feature processing')
feature_processor = FeatureProcessor()
train_df = feature_processor.process(train_df)
test_df = feature_processor.process(test_df)
# logger.info('Split data into train and validation')
logger.info('Build Model')
data = TabularDataset(data=train_filepath)
data = data.drop(columns=['UPRN', 'HEAT_DEMAND_CHANGE'])
TOP_FEATURES = ['MAINHEAT', 'ROOF', 'WALLS', 'MAINHEATCONT', 'PHOTO', 'HOTWATER', 'SECONDHEAT']
# top_features = data.columns[data.columns.str.startswith(tuple(TOP_FEATURES))]
data = data[['RDSAP_CHANGE'] + top_features.to_list()]
# data = TabularDataset(data=train_df)
# data['RDSAP_CHANGE'] = data['RDSAP_CHANGE'].astype(float)
subsample_size = round(len(data)/20)
data = data.sample(subsample_size, random_state=RANDOM_SEED)
# Add custom metric class MAPE
# Have a look at temporal features
target_column = 'RDSAP_CHANGE'
predictor_RDSAP = TabularPredictor(
label=target_column,
path="agModels-predictRDSAP",
problem_type="regression",
eval_metric='mean_absolute_error'
).fit(data, time_limit=200, presets='best_quality', excluded_model_types=['KNN'])
logger.info('Evaluate matrics')
test_data = TabularDataset('./model_build_data/test_data.parquet')
performance = predictor_RDSAP.evaluate(test_data)
predictions = predictor_RDSAP.predict(test_data)
test_data['predictions'] = predictions
test_data['diff'] = abs(test_data['RDSAP_CHANGE'] - test_data['predictions'])
if __name__ == "__main__":
logger.info('---Begin Pipeline---')
logger.info('---Ingest Arguments---')
args = ingest_arguments()
training(train_filepath=args.train_filepath, test_filepath=args.test_filepath)

View file

@ -36,7 +36,7 @@ class TestCleanFloor:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known floor types or thermal transmittance",
"description with no known floor data_types or thermal transmittance",
]
for description in invalid_descriptions:

View file

@ -29,7 +29,7 @@ class TestHotWaterAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known hotwater types",
"description with no known hotwater data_types",
""
]

View file

@ -29,7 +29,7 @@ class TestMainHeatControlAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known fuel types",
"description with no known fuel data_types",
]
for description in invalid_descriptions:

View file

@ -34,7 +34,7 @@ class TestMainHeatAttributes:
invalid_descriptions = [
"",
"invalid description",
"description with no known heating types",
"description with no known heating data_types",
]
for description in invalid_descriptions:

View file

@ -29,7 +29,7 @@ class TestMainHeatControlAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known heating control types",
"description with no known heating control data_types",
]
for description in invalid_descriptions:

View file

@ -24,3 +24,57 @@ def correct_spelling(text):
corrected_text = ' '.join(corrected_words)
return corrected_text
def sap_to_epc(sap_points: int):
"""
Simple utility function to convert SAP points to EPC rating.
:param sapPoints: numerical value of SAP points, typically between 0 and 100
:return:
"""
if sap_points <= 0 or sap_points > 100:
raise ValueError("SAP points should be between 1 and 100.")
if sap_points > 91:
return "A"
elif sap_points > 80:
return "B"
elif sap_points > 69:
return "C"
elif sap_points > 55:
return "D"
elif sap_points > 39:
return "E"
elif sap_points > 21:
return "F"
else:
return "G"
def epc_to_sap_lower_bound(epc: str):
"""
Given an EPC rating, returns the lower bound SAP score required
to hit that EPC rating
:param epc: EPC rating, between A and G
:return:
"""
if epc == "A":
return 92
elif epc == "B":
return 81
elif epc == "C":
return 70
elif epc == "D":
return 56
elif epc == "E":
return 40
elif epc == "F":
return 22
elif epc == "G":
return 1
else:
raise ValueError("EPC rating should be between A and G")

View file

@ -1,11 +1,12 @@
import math
from typing import List
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from datatypes.enums import QuantityUnits
from backend.Property import Property
from recommendations.rdsap_tables import default_wall_thickness, age_band_data
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_uvalue_estimate
get_recommended_part, get_uvalue_estimate, estimate_sap_points
)
suspended_floor_insulation_parts = [
@ -13,7 +14,7 @@ suspended_floor_insulation_parts = [
# Example product
# https://www.insulationsuperstore.co.uk/product/recticel-eurothane-general-purpose-pir-insulation-board-2400
# -x-1200-x-100mm.html
# All product types here:
# All product data_types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/recticel/filterby/application/floors.html
"type": "suspended_floor_insulation",
"description": "Rigid Insulation Foam Boards",
@ -29,7 +30,7 @@ suspended_floor_insulation_parts = [
{
# Example product
# https://www.insulationsuperstore.co.uk/product/rockwool-rwa45-acoustic-insulation-slab-100mm-2-88m2-pack.html
# All product types here:
# All product data_types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/rockwool/filterby/application/floors
# /material/mineral-wool.html
"type": "suspended_floor_insulation",
@ -49,7 +50,7 @@ solid_floor_insulation_parts = [
{
# Example product
# https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation/k103-100mm
# All product types here:
# All product data_types here:
# https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation?brand=7015&p=1
# Example screed https://www.screwfix.com/p/mapei-ultraplan-3240-self-levelling-compound-25kg/4959f
"type": "solid_floor_insulation",
@ -69,7 +70,7 @@ solid_floor_insulation_parts = [
parts = suspended_floor_insulation_parts + solid_floor_insulation_parts
class FloorRecommendations(BaseUtility):
class FloorRecommendations(Definitions):
# part L building regulations indicate that any rennovations on an existing property's walls should
# achieve a U-value of no higher than 0.3
BUILDING_REGULATIONS_PART_L_MAX_U_VALUE = 0.25
@ -116,6 +117,13 @@ class FloorRecommendations(BaseUtility):
else:
self.materials = parts
self.suspended_floor_insulation_parts = [
part for part in self.materials if part["type"] == "suspended_floor_insulation"
]
self.solid_floor_insulation_parts = [
part for part in self.materials if part["type"] == "solid_floor_insulation"
]
@staticmethod
def _estimate_perimeter(floor_area, num_rooms):
# Compute average room size based on total floor area and number of rooms
@ -266,11 +274,15 @@ class FloorRecommendations(BaseUtility):
if is_suspended:
# Given the U-value, we recommend underfloor insulation
self.recommend_floor_insulation(u_value=u_value, parts=suspended_floor_insulation_parts)
self.recommend_floor_insulation(u_value=u_value, parts=self.suspended_floor_insulation_parts)
if is_solid:
# Given the U-value, we recommend solid floor insulation options which are usually solid foam
self.recommend_floor_insulation(u_value=u_value, parts=solid_floor_insulation_parts)
self.recommend_floor_insulation(u_value=u_value, parts=self.solid_floor_insulation_parts)
@staticmethod
def _make_floor_description(part, depth):
return f"Install {depth}{part['depth_unit']} {part['description']} insulation"
def recommend_floor_insulation(self, u_value, parts):
"""
@ -280,7 +292,8 @@ class FloorRecommendations(BaseUtility):
lowest_selected_u_value = None
for part in parts:
for depth in part["depths"]:
for depth, cost_per_unit in zip(part["depths"], part["cost"]):
part_u_value = r_value_per_mm_to_u_value(depth, part["r_value_per_mm"])
_, new_u_value = calculate_u_value_uplift(u_value, part_u_value)
new_u_value = math.ceil(new_u_value * 100.0) / 100.0
@ -293,12 +306,25 @@ class FloorRecommendations(BaseUtility):
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
estimated_cost = cost_per_unit * self.property.floor_area
self.recommendations.append(
{
"parts": [
get_recommended_part(part, depth),
get_recommended_part(
part=part,
selected_depth=depth,
quantity=self.property.floor_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=estimated_cost
),
],
"type": "floor_insulation",
"description": self._make_floor_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"cost": estimated_cost,
}
)

View file

@ -1,11 +1,12 @@
import itertools
import math
from datatypes.enums import QuantityUnits
from backend.Property import Property
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_uvalue_estimate
get_recommended_part, get_uvalue_estimate, estimate_sap_points
)
external_wall_insulation_parts = [
@ -184,7 +185,7 @@ internal_wall_insulation_parts = [
wall_parts = external_wall_insulation_parts + internal_wall_insulation_parts
class WallRecommendations(BaseUtility):
class WallRecommendations(Definitions):
YEAR_WALLS_BUILT_WITH_INSULATION = 1990
# After 1930, Solid brick walls became less populate and instead, cavity walls became a
# more popular choice
@ -310,7 +311,8 @@ class WallRecommendations(BaseUtility):
recommendations = []
for part in parts:
for depth in part["depths"]:
for depth, cost_per_unit in zip(part["depths"], part["cost"]):
part_u_value = r_value_per_mm_to_u_value(depth, part["r_value_per_mm"])
_, new_u_value = calculate_u_value_uplift(u_value, part_u_value)
@ -331,10 +333,25 @@ class WallRecommendations(BaseUtility):
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
estimated_cost = cost_per_unit * self.property.insulation_wall_area
recommendations.append(
{
"parts": [get_recommended_part(part, depth)],
"parts": [
get_recommended_part(
part=part,
selected_depth=depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=estimated_cost
)
],
"type": "wall_insulation",
"description": "Install " + self._make_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"cost": estimated_cost,
}
)
@ -367,7 +384,10 @@ class WallRecommendations(BaseUtility):
# By looping through ewi first, if there is nothing there, that ensures not combinations are tested
for ewi_part in ewi_parts:
for iwi_part in iwi_parts:
for ewi_depth, iwi_depth in itertools.product(ewi_part["depths"], iwi_part["depths"]):
for (ewi_depth, ewi_cost_per_unit), (iwi_depth, iwi_cost_per_unit) in itertools.product(
zip(ewi_part["depths"], ewi_part["cost"]),
zip(iwi_part["depths"], iwi_part["cost"])
):
ewi_part_u_value = r_value_per_mm_to_u_value(ewi_depth, ewi_part["r_value_per_mm"])
iwi_part_u_value = r_value_per_mm_to_u_value(iwi_depth, iwi_part["r_value_per_mm"])
@ -385,17 +405,44 @@ class WallRecommendations(BaseUtility):
if combined_new_u_value - self.U_VALUE_ERROR <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# Here you might want to define a way to add both recommendations together.
# For now, I'm adding them as separate items in the list
ewi_esimtated_cost = ewi_cost_per_unit * self.property.insulation_wall_area
iwi_esimtated_cost = iwi_cost_per_unit * self.property.insulation_wall_area
recommendation = {
"parts": [
get_recommended_part(ewi_part, ewi_depth),
get_recommended_part(iwi_part, iwi_depth)
get_recommended_part(
part=ewi_part,
selected_depth=ewi_depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=ewi_esimtated_cost
),
get_recommended_part(
part=iwi_part,
selected_depth=iwi_depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=iwi_esimtated_cost
)
],
"type": "wall_insulation",
"description": (
"Install " + self._make_description(ewi_part, ewi_depth) + " and " +
self._make_description(iwi_part, iwi_depth)
),
"starting_u_value": u_value,
"new_u_value": combined_new_u_value,
"sap_points": estimate_sap_points(),
"cost": ewi_esimtated_cost + iwi_esimtated_cost,
}
self.recommendations.append(recommendation)
self.prune_diminishing_recommendations()
@staticmethod
def _make_description(part, depth):
return f"{depth}{part['depth_unit']} {part['description']}"
def prune_diminishing_recommendations(self):
# For any recommendations, if we have at least 1 reommendation that does not exhibit diminishing returns
# we trim all others that are beyond the diminishing returns threshold

View file

@ -3,6 +3,15 @@ from backend.Property import Property
from statistics import mean
def estimate_sap_points():
"""
This is a placeholder function. We will implement the proper version soon
:return:
"""
return 999
def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float):
"""
Converts R-value per mm to U-value in W/m²K.
@ -101,15 +110,21 @@ def update_lowest_selected_u_value(lowest_selected_u_value, new_u_value):
return lowest_selected_u_value
def get_recommended_part(part, selected_depth):
def get_recommended_part(part, selected_depth, selected_total_cost, quantity, quantity_unit):
"""
Utility function to return a recommended part with the selected depth.
:param part:
:param selected_depth:
:param part: part to be recommended
:param selected_depth: depth of the selected part
:param selected_total_cost: Total cost of the selected part
:param quantity: Quantity of the selected part
:param quantity_unit: Unit of the quantity
:return:
"""
recommended_part = deepcopy(part)
recommended_part["depths"] = [selected_depth]
recommended_part["estimated_cost"] = selected_total_cost
recommended_part["quantity"] = quantity
recommended_part["quantity_unit"] = quantity_unit
return recommended_part

View file

@ -46,6 +46,7 @@ package:
- 'model_data/EpcClean.py'
- 'model_data/utils.py'
- 'model_data/epc_attributes/**'
- 'datatypes/**'
- '!infrastructure/**'
- '!data_collection/**'
- '!node_modules/**'