This commit is contained in:
Jun-te Kim 2026-05-07 15:55:40 +00:00
parent 24ec68bb9f
commit 4f45eeb3e9
6 changed files with 345 additions and 271 deletions

View file

@ -31,17 +31,19 @@ from recommendations.recommendation_utils import (
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from dotenv import load_dotenv
# from dotenv import load_dotenv
logger = setup_logger()
load_dotenv(dotenv_path="../backend/.env")
# load_dotenv(dotenv_path="../backend/.env")
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
def __init__(
self, standard_values, standard_map=None, max_tokens=1000, api_key=None
):
"""
Initialize the remapper with standard values and a predefined mapping.
@ -75,7 +77,8 @@ class DataRemapper:
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
print(f"DATA REMAPPER api key is {api_key}")
self.openai_client = OpenAI(api_key=api_key)
@staticmethod
def clean_string(text):
@ -136,12 +139,20 @@ class DataRemapper:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
try:
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
except Exception as e:
print(f"[debug] OpenAI call failed. type={type(e).__name__}")
print(f"[debug] status={getattr(e, 'status_code', None)}")
print(f"[debug] body={getattr(e, 'response', None) and e.response.text}")
print(f"[debug] model={self.ai_model}")
raise
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
@ -504,6 +515,7 @@ class AssetList:
landlord_block_reference=None,
phase=False,
header=0,
openai_api_key=None,
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
@ -529,6 +541,7 @@ class AssetList:
self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame()
self.geographical_areas = pd.DataFrame()
self.openai_api_key = openai_api_key
# When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to
@ -1107,6 +1120,7 @@ class AssetList:
remapper = DataRemapper(
standard_values=config["standard_values"],
standard_map=config["standard_map"],
api_key=self.openai_api_key,
)
remap_dictionary = remapper.standardize_list(
values_to_remap=values_to_remap.tolist()
@ -1296,8 +1310,8 @@ class AssetList:
self.standardised_asset_list[
self.ATTRIBUTE_HAS_SOLAR
] = self.standardised_asset_list[
self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
] | ~self.standardised_asset_list[
self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
] | ~self.standardised_asset_list[
self.EPC_API_DATA_NAMES["photo-supply"]
].isin(
["0.0", 0, None, "", np.nan]
@ -1315,7 +1329,7 @@ class AssetList:
property_type=(
str(x[self.STANDARD_PROPERTY_TYPE]).title()
if str(x[self.STANDARD_PROPERTY_TYPE]).title()
in accepted_epc_property_types
in accepted_epc_property_types
else (
x[self.EPC_API_DATA_NAMES["property-type"]]
if not pd.isnull(
@ -1373,9 +1387,9 @@ class AssetList:
self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]]
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]]
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
),
axis=1,
)
@ -1460,7 +1474,7 @@ class AssetList:
year_lower_bound = (
2007
if x[self.EPC_API_DATA_NAMES["construction-age-band"]]
== "England and Wales: 2007 onwards"
== "England and Wales: 2007 onwards"
else 2012
)
@ -1515,7 +1529,7 @@ class AssetList:
age_band_matches = (
"EPC Age Band Matches Year Built"
if x[self.STANDARD_YEAR_BUILT]
== int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
== int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
else "EPC Age Band is different from Year Built"
)
@ -1545,7 +1559,7 @@ class AssetList:
age_band_matches = (
"EPC Age Band Matches Year Built"
if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date))
and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
else (
"EPC Age Band is older than Year Built"
if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
@ -1717,22 +1731,22 @@ class AssetList:
if self.non_intrusives_present:
if self.new_format_non_insturives_present_v2:
non_intrusives_wall_filter = (
self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL", "EMPTY CAVITY"]
)
else:
non_intrusives_wall_filter = (
self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL"]
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = self.standardised_asset_list[
"non-intrusives: WFT Findings"
].str.lower().str.strip().isin(
"non-intrusives: WFT Findings"
].str.lower().str.strip().isin(
[
"empty cavity",
"partial fill",
@ -1742,18 +1756,18 @@ class AssetList:
"empty cav",
]
) | (
(
self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower()
.str.strip()
.str.contains("empty cavity|partial fill")
& ~self.standardised_asset_list["non-intrusives: WFT Findings"]
.astype(str)
.str.lower()
.str.strip()
.str.contains("major access issues")
)
)
(
self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower()
.str.strip()
.str.contains("empty cavity|partial fill")
& ~self.standardised_asset_list["non-intrusives: WFT Findings"]
.astype(str)
.str.lower()
.str.strip()
.str.contains("major access issues")
)
)
else:
# We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False
@ -1765,12 +1779,12 @@ class AssetList:
)
else:
year_built_filter = (
self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
) | (
self.standardised_asset_list["epc_year_upper_bound"]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
)
self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
) | (
self.standardised_asset_list["epc_year_upper_bound"]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
)
# Criteria:
# The property isn't a bedsit
@ -1811,8 +1825,8 @@ class AssetList:
] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity_has_solar"
]
"non_intrusive_indicates_empty_cavity_has_solar"
]
& (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
["bedsit"]
@ -1888,8 +1902,8 @@ class AssetList:
.str.lower()
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
| self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["uninsulated cavity"]
)
["uninsulated cavity"]
)
)
######################################################
@ -1926,8 +1940,8 @@ class AssetList:
extraction_wall_filter = (
extraction_wall_filter
& ~self.standardised_asset_list[
"non-intrusives: Eligibility (Red/Yellow/Green)"
].isin(["RED"])
"non-intrusives: Eligibility (Red/Yellow/Green)"
].isin(["RED"])
)
self.standardised_asset_list[
@ -2023,26 +2037,26 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_data_indicates_correct_heating_system"
] = (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains(
"air source heat pump|ground source heat pump|boiler and radiators, electric"
)
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
== "Controls for high heat retention storage heaters"
)
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains(
"air source heat pump|ground source heat pump|boiler and radiators, electric"
)
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
== "Controls for high heat retention storage heaters"
)
)
# If the landlord has given us the heating system, we default to that on heating upgrades. Because of the
# poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the
@ -2050,25 +2064,25 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_data_indicates_requires_heating_upgrade"
] = (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters|room heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
.str.lower()
.str.contains("electric storage heaters|room heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
!= "Controls for high heat retention storage heaters"
)
) & (
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["district heating", "communal heating", "communal gas boiler"]
)
& ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
.astype(str)
.str.contains("gas ")
!= "Controls for high heat retention storage heaters"
)
) & (
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["district heating", "communal heating", "communal gas boiler"]
)
& ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
.astype(str)
.str.contains("gas ")
)
# Basic check - both of the previous two shouldn't be true simultaneously
if (
@ -2148,8 +2162,8 @@ class AssetList:
self.standardised_asset_list[
"solar_non_intrusives_walls_insulated"
] = self.standardised_asset_list[
"non-intrusives: WFT Findings"
].str.lower().str.strip().isin(
"non-intrusives: WFT Findings"
].str.lower().str.strip().isin(
[
"retro drilled",
"retro filled",
@ -2158,8 +2172,8 @@ class AssetList:
"retro drilled and filled",
]
) | self.standardised_asset_list[
"non-intrusives: WFT Findings"
].str.lower().str.strip().str.contains(
"non-intrusives: WFT Findings"
].str.lower().str.strip().str.contains(
"retro drilled"
)
else:
@ -2176,19 +2190,14 @@ class AssetList:
)
self.standardised_asset_list["solar_epc_walls_insulated"] = (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES[
"walls-description"]]
.str.lower()
.str.contains("|".join(
self.EPC_INSULATED_WALLS_SUBSTRINGS))
) | (
self.standardised_asset_list[
"walls_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(
x) else False
)
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]]
.str.lower()
.str.contains("|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS))
) | (
self.standardised_asset_list["walls_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
roof_data = []
for desc in self.standardised_asset_list[
@ -2230,20 +2239,20 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_loft_needs_topup"
] = self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].apply(
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False
) | (
(
self.standardised_asset_list["is_loft"]
| self.standardised_asset_list["is_pitched"]
)
& (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].isin(["below average", "none"])
)
(
self.standardised_asset_list["is_loft"]
| self.standardised_asset_list["is_pitched"]
)
& (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].isin(["below average", "none"])
)
)
self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
@ -2252,16 +2261,15 @@ class AssetList:
# Check if the boiler is electric
# We check if it contains both the terms boiler & electric
self.standardised_asset_list["has_electric_boiler"] = (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.isin(["boiler and radiators, electric"])
) | (
self.standardised_asset_list[
self.STANDARD_HEATING_SYSTEM]
== "electric boiler"
)
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.isin(["boiler and radiators, electric"])
) | (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
== "electric boiler"
)
####################################
# Check solar eligibility
@ -2399,11 +2407,11 @@ class AssetList:
empty_cavity_map = {
"non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE
+ ": ",
+ ": ",
"non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property "
"already has solar: ",
"already has solar: ",
"non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, "
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
}
for variable, description in empty_cavity_map.items():
self.standardised_asset_list["cavity_reason"] = np.where(
@ -2419,8 +2427,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity"
]
"non_intrusive_indicates_empty_cavity"
]
& (
self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower()
@ -2445,8 +2453,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity"
]
"non_intrusive_indicates_empty_cavity"
]
& self.standardised_asset_list[
"non_intrusive_indicates_cavity_extraction"
]
@ -2461,8 +2469,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity"
]
"non_intrusive_indicates_empty_cavity"
]
& (
self.standardised_asset_list["non-intrusives: Insulated"]
== "RETRO DRILLED"
@ -2478,8 +2486,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity"
]
"non_intrusive_indicates_empty_cavity"
]
& (
self.standardised_asset_list["non-intrusives: Insulated"]
== "FILLED AT BUILD"
@ -2495,8 +2503,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity"
]
"non_intrusive_indicates_empty_cavity"
]
& pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"],
@ -2640,7 +2648,7 @@ class AssetList:
identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"])
| ~pd.isnull(self.standardised_asset_list["solar_reason"])
][self.DOMNA_PROPERTY_ID].values
][self.DOMNA_PROPERTY_ID].values
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
self.outcomes_for_output = self.outcomes[
@ -2675,12 +2683,12 @@ class AssetList:
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats"
]
]
non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats"
]
]
# Produce some aggregate figures
self.work_type_figures = {
@ -2723,7 +2731,7 @@ class AssetList:
blocks = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats"
].copy()
].copy()
if blocks.empty:
return
@ -2860,7 +2868,7 @@ class AssetList:
self.standardised_asset_list = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats"
]
]
self.standardised_asset_list = pd.concat(
[self.standardised_asset_list, expanded_blocks], ignore_index=True
@ -2940,7 +2948,7 @@ class AssetList:
# find any block refs with more than 50% emptires
viable_empty_blocks = self.block_analysis_df[
self.block_analysis_df["Percentage of Empties"] >= 0.50
]
]
if not viable_empty_blocks.empty:
project_code_lookup = viable_empty_blocks[["Block Reference"]].copy()
@ -3179,7 +3187,7 @@ class AssetList:
contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[
[self.contact_detail_fields["landlord_property_id"]] + details_colnames
]
]
contact_details = contact_details[
~pd.isnull(
contact_details[self.contact_detail_fields["landlord_property_id"]]
@ -3572,13 +3580,10 @@ class AssetList:
"Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>": date_of_inspections,
"Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>": non_intrusives_construction,
"Non-intrusives: Insulation <LISTING non_intrusives__insulation>": non_intrusives_insulated,
"Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>":
non_intrusives_insulation_material,
"Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>":
non_intrusives_ciga_check_required,
"Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>": non_intrusives_insulation_material,
"Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>": non_intrusives_ciga_check_required,
"Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>": non_intrusives_pv_access,
"Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>":
non_intrusives_roof_orientation,
"Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>": non_intrusives_roof_orientation,
"Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>": non_intrusives_surveyor_notes,
"Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>": non_intrusives_surveyor_name,
"CIGA: Date Requested <LISTING ciga__date_requested>": None, # TODO: Don't have this for the moment
@ -3755,8 +3760,8 @@ class AssetList:
# We compare address line 1 to full address
if any(
df[self.STANDARD_FULL_ADDRESS]
.str.lower()
.str.contains(row["Address Line 1"].lower(), na=False)
.str.lower()
.str.contains(row["Address Line 1"].lower(), na=False)
):
df = df[
df[self.STANDARD_FULL_ADDRESS]
@ -3996,7 +4001,7 @@ class AssetList:
matched = matched[
matched["houseno"].astype(str) == house_no_to_match
]
]
if matched.shape[0] == 1:
lookup_i.append(
{
@ -4021,7 +4026,7 @@ class AssetList:
)[0]
matched = matched[
matched[self.STANDARD_FULL_ADDRESS] == best_match
]
]
lookup_i.append(
{
"row_id": x["row_id"],
@ -4332,7 +4337,7 @@ class AssetList:
df = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID]
== row[master_id_colnames[idx]]
]
]
if df.shape[0] == 1:
matched.append(
{
@ -4438,7 +4443,7 @@ class AssetList:
)[1]
)
> 90
]
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
@ -4446,8 +4451,8 @@ class AssetList:
if any(
df[self.STANDARD_FULL_ADDRESS]
.str.lower()
.str.contains(
.str.lower()
.str.contains(
" ".join(
[row[house_no_col], row["Street / Block Name"]]
).lower()
@ -4474,7 +4479,7 @@ class AssetList:
row[property_type_col].split(" ")[-1].lower()
)
& (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
]
]
if df.shape[0] != 1:
# We have multiple matches - it's likely because the landlord has a duplicate

View file

@ -21,6 +21,11 @@ EPC_AUTH_TOKEN = os.getenv(
OPENAI_API_KEY = os.getenv(
"OPENAI_API_KEY",
)
print(
f"[debug] OPENAI_API_KEY loaded: "
f"{OPENAI_API_KEY[:8]}...{OPENAI_API_KEY[-4:] if OPENAI_API_KEY else 'NONE'} "
f"(len={len(OPENAI_API_KEY) if OPENAI_API_KEY else 0})"
)
def extract_address1(
@ -74,23 +79,23 @@ def app():
"""
data_folder = "/workspaces/model/asset_list"
data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx"
sheet_name = "in"
postcode_column = "postcode_clean"
address1_column = "address2uprn_address"
data_filename = "input.xlsx"
sheet_name = "Handovers"
postcode_column = "POSTCODE"
address1_column = "Full Addres"
address1_method = None
fulladdress_column = "address2uprn_address"
fulladdress_column = "Full Addres"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = "address2uprn_uprn"
landlord_property_type = "Property Type" # Good to include if landlord gave
landlord_built_form = "Built Form" # Good to include if landlord gave
landlord_os_uprn = "domna_found_uprn"
landlord_property_type = "PROPERTY TYPE" # Good to include if landlord gave
landlord_built_form = "Type Description" # Good to include if landlord gave
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "UPRN"
landlord_property_id = "PROP REF"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -131,6 +136,7 @@ def app():
landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference,
phase=phase,
openai_api_key=OPENAI_API_KEY,
)
asset_list.init_standardise()
@ -462,3 +468,9 @@ def app():
asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False
)
for key,value in dict.items():
lsakjfldsa

View file

@ -77,6 +77,7 @@ class Settings(BaseSettings):
OSMOSIS_ACD_SHAREPOINT_ID: Optional[str] = None
PRIVATE_PAY_SHAREPOINT_ID: Optional[str] = None
SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID: Optional[str] = None
OPENAI_API_KEY: Optional[str] = None
# Pas Hub
PASHUB_EMAIL: Optional[str] = None

View file

@ -2,8 +2,8 @@ from fastapi import APIRouter, HTTPException, status
from jose import jwt, jwe
import json
import datetime
from app.config import get_settings
from app.dependencies import get_derived_encryption_key
from backend.app.config import get_settings
from backend.app.dependencies import get_derived_encryption_key
router = APIRouter(
prefix="/local",
@ -27,7 +27,12 @@ def create_dummy_token(secret: str) -> str:
"dbId": "known_id",
}
token = jwe.encrypt(json.dumps(claims), get_derived_encryption_key(secret), algorithm="dir", encryption="A256GCM")
token = jwe.encrypt(
json.dumps(claims),
get_derived_encryption_key(secret),
algorithm="dir",
encryption="A256GCM",
)
return token
@ -40,6 +45,8 @@ async def dummy_token():
async def dummy_token():
settings = get_settings()
if settings.ENVIRONMENT != "local":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Dummy token can only be generated in local environment")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Dummy token can only be generated in local environment",
)
return {"dummy_token": create_dummy_token(settings.SECRET_KEY)}

View file

@ -30,10 +30,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({
"detail": exc.errors(),
"body": exc.body
}),
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
@ -63,7 +60,8 @@ app.include_router(tasks_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router
from backend.app.local import router as local_router
app.include_router(local_router.router)
handler = Mangum(app)
@ -98,10 +96,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({
"detail": exc.errors(),
"body": exc.body
}),
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
@ -130,7 +125,8 @@ app.include_router(whlg_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router
from backend.app.local import router as local_router
app.include_router(local_router.router)
handler = Mangum(app)

View file

@ -21,28 +21,28 @@ regional_labour_variations = [
{"Region": "Yorkshire and the Humber", "Adjustment_Factor": 0.86},
{"Region": "Wales", "Adjustment_Factor": 0.88},
{"Region": "Scotland", "Adjustment_Factor": 0.88},
{"Region": "Northern Ireland", "Adjustment_Factor": 0.76}
{"Region": "Northern Ireland", "Adjustment_Factor": 0.76},
]
# Installers are now working with 435 watt panels
PANEL_SIZE = 0.435
INSTALLER_SOLAR_COSTS = [
{'n_panels': 4, 'array_kwp': 4 * PANEL_SIZE, 'cost': 4089.25, 'installer': 'CEG'},
{'n_panels': 5, 'array_kwp': 5 * PANEL_SIZE, 'cost': 4242.48, 'installer': 'CEG'},
{'n_panels': 6, 'array_kwp': 6 * PANEL_SIZE, 'cost': 4395.71, 'installer': 'CEG'},
{'n_panels': 7, 'array_kwp': 7 * PANEL_SIZE, 'cost': 4548.94, 'installer': 'CEG'},
{'n_panels': 8, 'array_kwp': 8 * PANEL_SIZE, 'cost': 4702.17, 'installer': 'CEG'},
{'n_panels': 9, 'array_kwp': 9 * PANEL_SIZE, 'cost': 4855.41, 'installer': 'CEG'},
{'n_panels': 10, 'array_kwp': 10 * PANEL_SIZE, 'cost': 5010.95, 'installer': 'CEG'},
{'n_panels': 11, 'array_kwp': 11 * PANEL_SIZE, 'cost': 5166.49, 'installer': 'CEG'},
{'n_panels': 12, 'array_kwp': 12 * PANEL_SIZE, 'cost': 5322.04, 'installer': 'CEG'},
{'n_panels': 13, 'array_kwp': 13 * PANEL_SIZE, 'cost': 5657.6, 'installer': 'CEG'},
{'n_panels': 14, 'array_kwp': 14 * PANEL_SIZE, 'cost': 5993.16, 'installer': 'CEG'},
{'n_panels': 15, 'array_kwp': 15 * PANEL_SIZE, 'cost': 6328.71, 'installer': 'CEG'},
{'n_panels': 16, 'array_kwp': 16 * PANEL_SIZE, 'cost': 6483.33, 'installer': 'CEG'},
{'n_panels': 17, 'array_kwp': 17 * PANEL_SIZE, 'cost': 6637.95, 'installer': 'CEG'},
{'n_panels': 18, 'array_kwp': 18 * PANEL_SIZE, 'cost': 6792.57, 'installer': 'CEG'}
{"n_panels": 4, "array_kwp": 4 * PANEL_SIZE, "cost": 4089.25, "installer": "CEG"},
{"n_panels": 5, "array_kwp": 5 * PANEL_SIZE, "cost": 4242.48, "installer": "CEG"},
{"n_panels": 6, "array_kwp": 6 * PANEL_SIZE, "cost": 4395.71, "installer": "CEG"},
{"n_panels": 7, "array_kwp": 7 * PANEL_SIZE, "cost": 4548.94, "installer": "CEG"},
{"n_panels": 8, "array_kwp": 8 * PANEL_SIZE, "cost": 4702.17, "installer": "CEG"},
{"n_panels": 9, "array_kwp": 9 * PANEL_SIZE, "cost": 4855.41, "installer": "CEG"},
{"n_panels": 10, "array_kwp": 10 * PANEL_SIZE, "cost": 5010.95, "installer": "CEG"},
{"n_panels": 11, "array_kwp": 11 * PANEL_SIZE, "cost": 5166.49, "installer": "CEG"},
{"n_panels": 12, "array_kwp": 12 * PANEL_SIZE, "cost": 5322.04, "installer": "CEG"},
{"n_panels": 13, "array_kwp": 13 * PANEL_SIZE, "cost": 5657.6, "installer": "CEG"},
{"n_panels": 14, "array_kwp": 14 * PANEL_SIZE, "cost": 5993.16, "installer": "CEG"},
{"n_panels": 15, "array_kwp": 15 * PANEL_SIZE, "cost": 6328.71, "installer": "CEG"},
{"n_panels": 16, "array_kwp": 16 * PANEL_SIZE, "cost": 6483.33, "installer": "CEG"},
{"n_panels": 17, "array_kwp": 17 * PANEL_SIZE, "cost": 6637.95, "installer": "CEG"},
{"n_panels": 18, "array_kwp": 18 * PANEL_SIZE, "cost": 6792.57, "installer": "CEG"},
]
# These are costs we received from CRG, for pricing up air source heat pumps
@ -80,7 +80,12 @@ INSTALLER_SOLAR_PV_INVERTER_COST = 7500
INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs
INSTALLER_SOLAR_BATTERY_COSTS = [
{'capacity_kwh': 5, 'description': 'Battery Add on', 'cost': 3769.89, 'installer': 'JJC'},
{
"capacity_kwh": 5,
"description": "Battery Add on",
"cost": 3769.89,
"installer": "JJC",
},
# {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'},
# {'capacity_kwh': 5, 'description': 'Battery Retrofit existing system', 'cost': 4250.00, 'installer': 'CEG'},
# {'capacity_kwh': 10, 'description': 'Battery Retrofit Existing system', 'cost': 5950.00, 'installer': 'CEG'}
@ -102,10 +107,14 @@ TTZC_SMART_THERMOSTAT_LABOUR_HOURS = 2
TTZC_ELECTRICIAN_HOURLY_RATE = 45
# Based on cost of a Nest temperature sensor
TTZC_ROOM_TEMPERATURE_SENSOR_COST = 50
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = 0.17 # (Assume ~ 10 mins install per sensor)
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = (
0.17 # (Assume ~ 10 mins install per sensor)
)
# Basedon an average cost of smart radiator values
TTZC_SMART_RADIATOR_VALUES = 50
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = 0.37 # (Assume ~ 15-30 mins install per valve)
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = (
0.37 # (Assume ~ 15-30 mins install per valve)
)
# boiler prices based on
# This is the cost of a firs time central heating install from The Warm Front rate card
@ -169,7 +178,7 @@ class Costs:
"heater_removal": 0.1,
"sealing_open_fireplace": 0.1,
"mechanical_ventilation": 0.26,
"sloping_ceiling_insulation": 0.26 # Similar to IWI so using the same contingency
"sloping_ceiling_insulation": 0.26, # Similar to IWI so using the same contingency
}
# Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs
@ -195,36 +204,46 @@ class Costs:
:param property_instance: Instance of a Property class containing relevant details like wall area.
"""
if not hasattr(property_instance, 'insulation_wall_area'):
raise ValueError("Property instance must have an 'insulation_wall_area' attribute")
if not hasattr(property_instance, "insulation_wall_area"):
raise ValueError(
"Property instance must have an 'insulation_wall_area' attribute"
)
self.property = property_instance
self.regional_labour_variations = regional_labour_variations
self.region = county_to_region_map.get(self.property.epc_record.county, None)
if self.region is None:
# Try and grab using the local-authority-label
self.region = county_to_region_map.get(self.property.epc_record.local_authority_label, None)
self.region = county_to_region_map.get(
self.property.epc_record.local_authority_label, None
)
if self.region is None:
# Try and get the region after converting the keys to lower
self.region = {
k.lower(): v for k, v in county_to_region_map.items()
}.get(self.property.epc_record.local_authority_label.lower(), None)
if self.property.epc_record.local_authority_label is not None:
self.region = {
k.lower(): v for k, v in county_to_region_map.items()
}.get(self.property.epc_record.local_authority_label.lower(), None)
if self.region is None:
logger.warning("No region found for county %s, defaulting to South East England",
self.property.epc_record.county)
logger.warning(
"No region found for county %s, defaulting to South East England",
self.property.epc_record.county,
)
self.region = "South East England"
self.labour_adjustment_factor = [
x["Adjustment_Factor"] for x in self.regional_labour_variations if
x["Region"] == self.region
x["Adjustment_Factor"]
for x in self.regional_labour_variations
if x["Region"] == self.region
][0]
if not self.labour_adjustment_factor:
raise ValueError("Labour adjustment factor not found")
def cavity_wall_insulation(self, wall_area, material, is_extraction_and_refill=False):
def cavity_wall_insulation(
self, wall_area, material, is_extraction_and_refill=False
):
"""
Calculates the total cost for cavity wall insulation based on material and labor costs,
including contingency, preliminaries, profit, and VAT.
@ -318,7 +337,8 @@ class Costs:
return {
"total": total_cost,
"contingency": self.CONTINGENCIES["suspended_floor_insulation"] * total_cost,
"contingency": self.CONTINGENCIES["suspended_floor_insulation"]
* total_cost,
"contingency_rate": self.CONTINGENCIES["suspended_floor_insulation"],
"labour_hours": labour_hours,
"labour_days": labour_days,
@ -370,8 +390,7 @@ class Costs:
# - Apply sub-linear scaling for realism
# - Enforce a minimum duration so estimates are not unrealistically low
labour_days = max(
min_days,
base_days * (insulation_floor_area / base_area) ** labour_exponent
min_days, base_days * (insulation_floor_area / base_area) ** labour_exponent
)
return labour_days
@ -388,7 +407,9 @@ class Costs:
total_cost = material["total_cost"] * insulation_floor_area
daily_labour_rate = 300 # Based on checkatrade
labour_days = self._estimate_number_of_days_for_solid_floor(insulation_floor_area)
labour_days = self._estimate_number_of_days_for_solid_floor(
insulation_floor_area
)
labour_cost = labour_days * daily_labour_rate
total_cost = total_cost + labour_cost
@ -404,7 +425,6 @@ class Costs:
}
def low_energy_lighting(self, number_of_lights, material):
"""
Calculates the total cost for low energy lighting based on material and labor costs,
including contingency, preliminaries, profit, and VAT.
@ -419,7 +439,7 @@ class Costs:
total_cost = material["total_cost"] * number_of_lights
labour_hours = 1
labour_days = (labour_hours / 8)
labour_days = labour_hours / 8
return {
"total": total_cost,
@ -450,26 +470,22 @@ class Costs:
}
@classmethod
def solar_pv(
cls,
solar_product,
scaffolding_options,
n_floors
):
"""
"""
def solar_pv(cls, solar_product, scaffolding_options, n_floors):
""" """
system_cost = solar_product["total_cost"]
if not solar_product["includes_scaffolding"]:
# We base this on the number of floors
scaffolding = [x["total_cost"] for x in scaffolding_options if x["size"] == n_floors]
scaffolding = [
x["total_cost"] for x in scaffolding_options if x["size"] == n_floors
]
if not scaffolding:
# If we have no options, handle this
if n_floors <= 3:
raise ValueError("No scaffolding options available for 3 or fewer floors")
raise ValueError(
"No scaffolding options available for 3 or fewer floors"
)
# We take the largest scaffolding option available
scaffolding_cost = max([x["total_cost"] for x in scaffolding_options])
else:
@ -523,9 +539,9 @@ class Costs:
We base the estimates for the cost of electric room heaters on the cost per room as estimated by the
following article:
https://www.bestelectricradiators.co.uk/blog/cost-to-install-a-new-heating-system-uk/
:param number_heated_rooms: int, number of rooms to be heated
:return:
:return:
"""
total_cost = 500 * number_heated_rooms
@ -547,11 +563,11 @@ class Costs:
}
def high_heat_electric_storage_heaters(
self, number_heated_rooms: int,
self,
number_heated_rooms: int,
needs_cylinder: bool,
product: dict | None = None
product: dict | None = None,
):
"""
We base the estimates for the cost of electric storage heaters on the cost per room as estimated by the
energy saving trust
@ -578,8 +594,11 @@ class Costs:
return {
"total": total_cost,
"contingency": total_cost * self.CONTINGENCIES["high_heat_retention_storage_heaters"],
"contingency_rate": self.CONTINGENCIES["high_heat_retention_storage_heaters"],
"contingency": total_cost
* self.CONTINGENCIES["high_heat_retention_storage_heaters"],
"contingency_rate": self.CONTINGENCIES[
"high_heat_retention_storage_heaters"
],
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@ -690,14 +709,14 @@ class Costs:
# The product costs are inclusive of VAT
product_costs = (
TTZC_SMART_THERMOSTAT_COST +
TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms +
TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
TTZC_SMART_THERMOSTAT_COST
+ TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms
+ TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
)
labour_hours = (
TTZC_SMART_THERMOSTAT_LABOUR_HOURS +
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms +
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
TTZC_SMART_THERMOSTAT_LABOUR_HOURS
+ TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms
+ TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
)
labour_costs = TTZC_ELECTRICIAN_HOURLY_RATE * labour_hours
# Add continency and preliminaries to the labour to account for the complexity of the job
@ -722,7 +741,9 @@ class Costs:
"labour_days": labour_days,
}
def programmer_trvs_bypass(self, number_heated_rooms, has_programmer, has_trvs, has_bypass):
def programmer_trvs_bypass(
self, number_heated_rooms, has_programmer, has_trvs, has_bypass
):
total_cost = 0
labour_hours = 0
@ -779,7 +800,9 @@ class Costs:
}
@staticmethod
def _estimate_n_radiators(number_habitable_rooms, total_floor_area, property_type, built_form):
def _estimate_n_radiators(
number_habitable_rooms, total_floor_area, property_type, built_form
):
# Base number of radiators: one per habitable room
base_radiators = number_habitable_rooms
@ -787,34 +810,49 @@ class Costs:
additional_radiators = 3 # Initial assumption
# Adjust additional radiators based on property type
if property_type == 'Flat':
additional_radiators -= 1 # Flats may need fewer radiators due to less exposure
elif property_type in ['House', 'Bungalow', 'Maisonette']:
if property_type == "Flat":
additional_radiators -= (
1 # Flats may need fewer radiators due to less exposure
)
elif property_type in ["House", "Bungalow", "Maisonette"]:
# Multiple floors in Maisonette may require additional heating points
additional_radiators += 2 # Houses and bungalows might need more due to greater exposure
additional_radiators += (
2 # Houses and bungalows might need more due to greater exposure
)
else:
raise Exception("Invalid property type")
# Adjust total radiator needs based on built form
form_factor = {
'Enclosed Mid-Terrace': 0.9,
'Mid-Terrace': 0.95,
'Enclosed End-Terrace': 0.95,
'Semi-Detached': 1.05,
'Detached': 1.25,
'End-Terrace': 1.05
"Enclosed Mid-Terrace": 0.9,
"Mid-Terrace": 0.95,
"Enclosed End-Terrace": 0.95,
"Semi-Detached": 1.05,
"Detached": 1.25,
"End-Terrace": 1.05,
}
# Calculate total heating power needed and number of radiators based on standard output
total_heating_power_required = total_floor_area * 80 # Watts per square meter
radiator_output = 1000 # Average wattage per radiator
total_radiators_based_on_power = (total_heating_power_required / radiator_output) * form_factor[built_form]
total_radiators_based_on_power = (
total_heating_power_required / radiator_output
) * form_factor[built_form]
# Final estimation taking the higher of calculated needs or base room count
estimated_radiators = max(total_radiators_based_on_power, base_radiators + additional_radiators)
estimated_radiators = max(
total_radiators_based_on_power, base_radiators + additional_radiators
)
return round(estimated_radiators)
def boiler(self, exising_room_heaters, system_change, n_heated_rooms, n_rooms, is_electric=False):
def boiler(
self,
exising_room_heaters,
system_change,
n_heated_rooms,
n_rooms,
is_electric=False,
):
"""
Based on a basic estimate of median value £2600 to install a low carbon combi boiler
First time central heating vosts can als be found here:
@ -859,12 +897,14 @@ class Costs:
number_habitable_rooms=n_rooms,
total_floor_area=self.property.floor_area,
property_type=self.property.epc_record.property_type,
built_form=self.property.epc_record.built_form
built_form=self.property.epc_record.built_form,
)
additionals_labour_cost = labour_rate * self.labour_adjustment_factor
radiator_cost = DOUBLE_RADIATOR_COST * n_radiators
system_change_cost = radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost
system_change_cost = (
radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost
)
system_change_cost_before_vat = system_change_cost / (1 + self.VAT_RATE)
system_change_vat = system_change_cost - system_change_cost_before_vat
# We add an extra labour day for the system change
@ -897,14 +937,18 @@ class Costs:
else:
return 250
def air_source_heat_pump(self, ashp_size: float, number_heated_rooms: int, total_floor_area: float) -> dict:
def air_source_heat_pump(
self, ashp_size: float, number_heated_rooms: int, total_floor_area: float
) -> dict:
"""
We produce a cost estimation for an air source heat pump, based on costs we have received from installers.
"""
system_cost = (
(ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST) + ASHP_SECURITY + ASHP_WALL_BRACKET
(ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST)
+ ASHP_SECURITY
+ ASHP_WALL_BRACKET
)
available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS]
@ -940,7 +984,9 @@ class Costs:
}
@staticmethod
def _estimate_number_of_days_for_sloping_ceiling(insulation_roof_area: float) -> float:
def _estimate_number_of_days_for_sloping_ceiling(
insulation_roof_area: float,
) -> float:
"""
Estimate labour days required to insulate an existing sloping ceiling.
@ -965,14 +1011,15 @@ class Costs:
min_days = 2
labour_days = max(
min_days,
base_days * (insulation_roof_area / base_area) ** labour_exponent
min_days, base_days * (insulation_roof_area / base_area) ** labour_exponent
)
return labour_days
@classmethod
def sloping_ceiling_insulation(cls, insulation_roof_area: float) -> Mapping[str, float]:
def sloping_ceiling_insulation(
cls, insulation_roof_area: float
) -> Mapping[str, float]:
"""
This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes.
:param insulation_roof_area: Area of the sloping ceiling to be insulated
@ -985,14 +1032,20 @@ class Costs:
# https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/
# https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/
# These assumptions last updated 21/02/2026
insulation_cost_per_m2 = 52 # The actual install process is quite similar to IWI
insulation_cost_per_m2 = (
52 # The actual install process is quite similar to IWI
)
labour_rate = 250 # per day
contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"]
labour_days = cls._estimate_number_of_days_for_sloping_ceiling(insulation_roof_area)
labour_days = cls._estimate_number_of_days_for_sloping_ceiling(
insulation_roof_area
)
labour_hours = labour_days * 8
total = (insulation_cost_per_m2 * insulation_roof_area) + (labour_rate * labour_days)
total = (insulation_cost_per_m2 * insulation_roof_area) + (
labour_rate * labour_days
)
# Assume VAT included in the total => total is 120% of subtotal
vat = total - (total / 1.2)