diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py
index dede3162..573c4f7c 100644
--- a/asset_list/AssetList.py
+++ b/asset_list/AssetList.py
@@ -31,17 +31,19 @@ from recommendations.recommendation_utils import (
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
-from dotenv import load_dotenv
+# from dotenv import load_dotenv
logger = setup_logger()
-load_dotenv(dotenv_path="../backend/.env")
+# load_dotenv(dotenv_path="../backend/.env")
# OpenAI API Key (set this in your environment variables for security)
-OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
+# OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
- def __init__(self, standard_values, standard_map=None, max_tokens=1000):
+ def __init__(
+ self, standard_values, standard_map=None, max_tokens=1000, api_key=None
+ ):
"""
Initialize the remapper with standard values and a predefined mapping.
@@ -75,7 +77,8 @@ class DataRemapper:
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
- self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
+ print(f"DATA REMAPPER api key is {api_key}")
+ self.openai_client = OpenAI(api_key=api_key)
@staticmethod
def clean_string(text):
@@ -136,12 +139,20 @@ class DataRemapper:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
- response = self.openai_client.chat.completions.create(
- model=self.ai_model,
- messages=[{"role": "user", "content": prompt}],
- max_tokens=self.max_tokens,
- temperature=0.1,
- )
+
+ try:
+ response = self.openai_client.chat.completions.create(
+ model=self.ai_model,
+ messages=[{"role": "user", "content": prompt}],
+ max_tokens=self.max_tokens,
+ temperature=0.1,
+ )
+ except Exception as e:
+ print(f"[debug] OpenAI call failed. type={type(e).__name__}")
+ print(f"[debug] status={getattr(e, 'status_code', None)}")
+ print(f"[debug] body={getattr(e, 'response', None) and e.response.text}")
+ print(f"[debug] model={self.ai_model}")
+ raise
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
@@ -504,6 +515,7 @@ class AssetList:
landlord_block_reference=None,
phase=False,
header=0,
+ openai_api_key=None,
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
@@ -529,6 +541,7 @@ class AssetList:
self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame()
self.geographical_areas = pd.DataFrame()
+ self.openai_api_key = openai_api_key
# When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to
@@ -1107,6 +1120,7 @@ class AssetList:
remapper = DataRemapper(
standard_values=config["standard_values"],
standard_map=config["standard_map"],
+ api_key=self.openai_api_key,
)
remap_dictionary = remapper.standardize_list(
values_to_remap=values_to_remap.tolist()
@@ -1296,8 +1310,8 @@ class AssetList:
self.standardised_asset_list[
self.ATTRIBUTE_HAS_SOLAR
] = self.standardised_asset_list[
- self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
- ] | ~self.standardised_asset_list[
+ self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
+ ] | ~self.standardised_asset_list[
self.EPC_API_DATA_NAMES["photo-supply"]
].isin(
["0.0", 0, None, "", np.nan]
@@ -1315,7 +1329,7 @@ class AssetList:
property_type=(
str(x[self.STANDARD_PROPERTY_TYPE]).title()
if str(x[self.STANDARD_PROPERTY_TYPE]).title()
- in accepted_epc_property_types
+ in accepted_epc_property_types
else (
x[self.EPC_API_DATA_NAMES["property-type"]]
if not pd.isnull(
@@ -1373,9 +1387,9 @@ class AssetList:
self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]]
- / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
+ / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]]
- / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
+ / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
),
axis=1,
)
@@ -1460,7 +1474,7 @@ class AssetList:
year_lower_bound = (
2007
if x[self.EPC_API_DATA_NAMES["construction-age-band"]]
- == "England and Wales: 2007 onwards"
+ == "England and Wales: 2007 onwards"
else 2012
)
@@ -1515,7 +1529,7 @@ class AssetList:
age_band_matches = (
"EPC Age Band Matches Year Built"
if x[self.STANDARD_YEAR_BUILT]
- == int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
+ == int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
else "EPC Age Band is different from Year Built"
)
@@ -1545,7 +1559,7 @@ class AssetList:
age_band_matches = (
"EPC Age Band Matches Year Built"
if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date))
- and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
+ and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
else (
"EPC Age Band is older than Year Built"
if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
@@ -1717,22 +1731,22 @@ class AssetList:
if self.non_intrusives_present:
if self.new_format_non_insturives_present_v2:
non_intrusives_wall_filter = (
- self.standardised_asset_list["non-intrusives: Construction"]
- == "CAVITY"
- ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
+ self.standardised_asset_list["non-intrusives: Construction"]
+ == "CAVITY"
+ ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL", "EMPTY CAVITY"]
)
else:
non_intrusives_wall_filter = (
- self.standardised_asset_list["non-intrusives: Construction"]
- == "CAVITY"
- ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
+ self.standardised_asset_list["non-intrusives: Construction"]
+ == "CAVITY"
+ ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL"]
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = self.standardised_asset_list[
- "non-intrusives: WFT Findings"
- ].str.lower().str.strip().isin(
+ "non-intrusives: WFT Findings"
+ ].str.lower().str.strip().isin(
[
"empty cavity",
"partial fill",
@@ -1742,18 +1756,18 @@ class AssetList:
"empty cav",
]
) | (
- (
- self.standardised_asset_list["non-intrusives: WFT Findings"]
- .str.lower()
- .str.strip()
- .str.contains("empty cavity|partial fill")
- & ~self.standardised_asset_list["non-intrusives: WFT Findings"]
- .astype(str)
- .str.lower()
- .str.strip()
- .str.contains("major access issues")
- )
- )
+ (
+ self.standardised_asset_list["non-intrusives: WFT Findings"]
+ .str.lower()
+ .str.strip()
+ .str.contains("empty cavity|partial fill")
+ & ~self.standardised_asset_list["non-intrusives: WFT Findings"]
+ .astype(str)
+ .str.lower()
+ .str.strip()
+ .str.contains("major access issues")
+ )
+ )
else:
# We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False
@@ -1765,12 +1779,12 @@ class AssetList:
)
else:
year_built_filter = (
- self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
- <= self.EMPTY_CAVITY_YEAR_THRESHOLD
- ) | (
- self.standardised_asset_list["epc_year_upper_bound"]
- <= self.EMPTY_CAVITY_YEAR_THRESHOLD
- )
+ self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
+ <= self.EMPTY_CAVITY_YEAR_THRESHOLD
+ ) | (
+ self.standardised_asset_list["epc_year_upper_bound"]
+ <= self.EMPTY_CAVITY_YEAR_THRESHOLD
+ )
# Criteria:
# The property isn't a bedsit
@@ -1811,8 +1825,8 @@ class AssetList:
] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity_has_solar"
- ]
+ "non_intrusive_indicates_empty_cavity_has_solar"
+ ]
& (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
["bedsit"]
@@ -1888,8 +1902,8 @@ class AssetList:
.str.lower()
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
| self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
- ["uninsulated cavity"]
- )
+ ["uninsulated cavity"]
+ )
)
######################################################
@@ -1926,8 +1940,8 @@ class AssetList:
extraction_wall_filter = (
extraction_wall_filter
& ~self.standardised_asset_list[
- "non-intrusives: Eligibility (Red/Yellow/Green)"
- ].isin(["RED"])
+ "non-intrusives: Eligibility (Red/Yellow/Green)"
+ ].isin(["RED"])
)
self.standardised_asset_list[
@@ -2023,26 +2037,26 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_data_indicates_correct_heating_system"
] = (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheat-description"]
- ]
- .str.lower()
- .str.contains(
- "air source heat pump|ground source heat pump|boiler and radiators, electric"
- )
- ) | (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheat-description"]
- ]
- .str.lower()
- .str.contains("electric storage heaters")
- & (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheatcont-description"]
- ]
- == "Controls for high heat retention storage heaters"
- )
+ self.standardised_asset_list[
+ self.EPC_API_DATA_NAMES["mainheat-description"]
+ ]
+ .str.lower()
+ .str.contains(
+ "air source heat pump|ground source heat pump|boiler and radiators, electric"
)
+ ) | (
+ self.standardised_asset_list[
+ self.EPC_API_DATA_NAMES["mainheat-description"]
+ ]
+ .str.lower()
+ .str.contains("electric storage heaters")
+ & (
+ self.standardised_asset_list[
+ self.EPC_API_DATA_NAMES["mainheatcont-description"]
+ ]
+ == "Controls for high heat retention storage heaters"
+ )
+ )
# If the landlord has given us the heating system, we default to that on heating upgrades. Because of the
# poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the
@@ -2050,25 +2064,25 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_data_indicates_requires_heating_upgrade"
] = (
+ self.standardised_asset_list[
+ self.EPC_API_DATA_NAMES["mainheat-description"]
+ ]
+ .str.lower()
+ .str.contains("electric storage heaters|room heaters")
+ & (
self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheat-description"]
+ self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
- .str.lower()
- .str.contains("electric storage heaters|room heaters")
- & (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheatcont-description"]
- ]
- != "Controls for high heat retention storage heaters"
- )
- ) & (
- ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
- ["district heating", "communal heating", "communal gas boiler"]
- )
- & ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
- .astype(str)
- .str.contains("gas ")
+ != "Controls for high heat retention storage heaters"
)
+ ) & (
+ ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
+ ["district heating", "communal heating", "communal gas boiler"]
+ )
+ & ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
+ .astype(str)
+ .str.contains("gas ")
+ )
# Basic check - both of the previous two shouldn't be true simultaneously
if (
@@ -2148,8 +2162,8 @@ class AssetList:
self.standardised_asset_list[
"solar_non_intrusives_walls_insulated"
] = self.standardised_asset_list[
- "non-intrusives: WFT Findings"
- ].str.lower().str.strip().isin(
+ "non-intrusives: WFT Findings"
+ ].str.lower().str.strip().isin(
[
"retro drilled",
"retro filled",
@@ -2158,8 +2172,8 @@ class AssetList:
"retro drilled and filled",
]
) | self.standardised_asset_list[
- "non-intrusives: WFT Findings"
- ].str.lower().str.strip().str.contains(
+ "non-intrusives: WFT Findings"
+ ].str.lower().str.strip().str.contains(
"retro drilled"
)
else:
@@ -2176,19 +2190,14 @@ class AssetList:
)
self.standardised_asset_list["solar_epc_walls_insulated"] = (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES[
- "walls-description"]]
- .str.lower()
- .str.contains("|".join(
- self.EPC_INSULATED_WALLS_SUBSTRINGS))
- ) | (
- self.standardised_asset_list[
- "walls_u_value"].apply(
- lambda x: x <= 0.7 if not pd.isnull(
- x) else False
- )
- )
+ self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]]
+ .str.lower()
+ .str.contains("|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS))
+ ) | (
+ self.standardised_asset_list["walls_u_value"].apply(
+ lambda x: x <= 0.7 if not pd.isnull(x) else False
+ )
+ )
roof_data = []
for desc in self.standardised_asset_list[
@@ -2230,20 +2239,20 @@ class AssetList:
self.standardised_asset_list[
"solar_epc_loft_needs_topup"
] = self.standardised_asset_list[
- self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
- ].apply(
+ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
+ ].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False
) | (
- (
- self.standardised_asset_list["is_loft"]
- | self.standardised_asset_list["is_pitched"]
- )
- & (
- self.standardised_asset_list[
- self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
- ].isin(["below average", "none"])
- )
+ (
+ self.standardised_asset_list["is_loft"]
+ | self.standardised_asset_list["is_pitched"]
)
+ & (
+ self.standardised_asset_list[
+ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
+ ].isin(["below average", "none"])
+ )
+ )
self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
@@ -2252,16 +2261,15 @@ class AssetList:
# Check if the boiler is electric
# We check if it contains both the terms boiler & electric
self.standardised_asset_list["has_electric_boiler"] = (
- self.standardised_asset_list[
- self.EPC_API_DATA_NAMES["mainheat-description"]
- ]
- .str.lower()
- .isin(["boiler and radiators, electric"])
- ) | (
- self.standardised_asset_list[
- self.STANDARD_HEATING_SYSTEM]
- == "electric boiler"
- )
+ self.standardised_asset_list[
+ self.EPC_API_DATA_NAMES["mainheat-description"]
+ ]
+ .str.lower()
+ .isin(["boiler and radiators, electric"])
+ ) | (
+ self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
+ == "electric boiler"
+ )
####################################
# Check solar eligibility
@@ -2399,11 +2407,11 @@ class AssetList:
empty_cavity_map = {
"non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE
- + ": ",
+ + ": ",
"non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property "
- "already has solar: ",
+ "already has solar: ",
"non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, "
- f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
+ f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
}
for variable, description in empty_cavity_map.items():
self.standardised_asset_list["cavity_reason"] = np.where(
@@ -2419,8 +2427,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity"
- ]
+ "non_intrusive_indicates_empty_cavity"
+ ]
& (
self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower()
@@ -2445,8 +2453,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity"
- ]
+ "non_intrusive_indicates_empty_cavity"
+ ]
& self.standardised_asset_list[
"non_intrusive_indicates_cavity_extraction"
]
@@ -2461,8 +2469,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity"
- ]
+ "non_intrusive_indicates_empty_cavity"
+ ]
& (
self.standardised_asset_list["non-intrusives: Insulated"]
== "RETRO DRILLED"
@@ -2478,8 +2486,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity"
- ]
+ "non_intrusive_indicates_empty_cavity"
+ ]
& (
self.standardised_asset_list["non-intrusives: Insulated"]
== "FILLED AT BUILD"
@@ -2495,8 +2503,8 @@ class AssetList:
(
self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[
- "non_intrusive_indicates_empty_cavity"
- ]
+ "non_intrusive_indicates_empty_cavity"
+ ]
& pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"],
@@ -2640,7 +2648,7 @@ class AssetList:
identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"])
| ~pd.isnull(self.standardised_asset_list["solar_reason"])
- ][self.DOMNA_PROPERTY_ID].values
+ ][self.DOMNA_PROPERTY_ID].values
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
self.outcomes_for_output = self.outcomes[
@@ -2675,12 +2683,12 @@ class AssetList:
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats"
- ]
+ ]
non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats"
- ]
+ ]
# Produce some aggregate figures
self.work_type_figures = {
@@ -2723,7 +2731,7 @@ class AssetList:
blocks = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats"
- ].copy()
+ ].copy()
if blocks.empty:
return
@@ -2860,7 +2868,7 @@ class AssetList:
self.standardised_asset_list = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats"
- ]
+ ]
self.standardised_asset_list = pd.concat(
[self.standardised_asset_list, expanded_blocks], ignore_index=True
@@ -2940,7 +2948,7 @@ class AssetList:
# find any block refs with more than 50% emptires
viable_empty_blocks = self.block_analysis_df[
self.block_analysis_df["Percentage of Empties"] >= 0.50
- ]
+ ]
if not viable_empty_blocks.empty:
project_code_lookup = viable_empty_blocks[["Block Reference"]].copy()
@@ -3179,7 +3187,7 @@ class AssetList:
contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[
[self.contact_detail_fields["landlord_property_id"]] + details_colnames
- ]
+ ]
contact_details = contact_details[
~pd.isnull(
contact_details[self.contact_detail_fields["landlord_property_id"]]
@@ -3572,13 +3580,10 @@ class AssetList:
"Non-Intrusives: Date Checked
": date_of_inspections,
"Non-Intrusives: Wall Type ": non_intrusives_construction,
"Non-intrusives: Insulation ": non_intrusives_insulated,
- "Non-intrusives: Insulation Material ":
- non_intrusives_insulation_material,
- "Non-Intrusives: CIGA Check Required ":
- non_intrusives_ciga_check_required,
+ "Non-intrusives: Insulation Material ": non_intrusives_insulation_material,
+ "Non-Intrusives: CIGA Check Required ": non_intrusives_ciga_check_required,
"Non-Intrusives: PV Access Issues ": non_intrusives_pv_access,
- "Non-Intrusives: Roof Orientation ":
- non_intrusives_roof_orientation,
+ "Non-Intrusives: Roof Orientation ": non_intrusives_roof_orientation,
"Non-Intrusives: Surveyor Notes ": non_intrusives_surveyor_notes,
"Non-Intrusives: Surveyor Name ": non_intrusives_surveyor_name,
"CIGA: Date Requested ": None, # TODO: Don't have this for the moment
@@ -3755,8 +3760,8 @@ class AssetList:
# We compare address line 1 to full address
if any(
df[self.STANDARD_FULL_ADDRESS]
- .str.lower()
- .str.contains(row["Address Line 1"].lower(), na=False)
+ .str.lower()
+ .str.contains(row["Address Line 1"].lower(), na=False)
):
df = df[
df[self.STANDARD_FULL_ADDRESS]
@@ -3996,7 +4001,7 @@ class AssetList:
matched = matched[
matched["houseno"].astype(str) == house_no_to_match
- ]
+ ]
if matched.shape[0] == 1:
lookup_i.append(
{
@@ -4021,7 +4026,7 @@ class AssetList:
)[0]
matched = matched[
matched[self.STANDARD_FULL_ADDRESS] == best_match
- ]
+ ]
lookup_i.append(
{
"row_id": x["row_id"],
@@ -4332,7 +4337,7 @@ class AssetList:
df = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID]
== row[master_id_colnames[idx]]
- ]
+ ]
if df.shape[0] == 1:
matched.append(
{
@@ -4438,7 +4443,7 @@ class AssetList:
)[1]
)
> 90
- ]
+ ]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
@@ -4446,8 +4451,8 @@ class AssetList:
if any(
df[self.STANDARD_FULL_ADDRESS]
- .str.lower()
- .str.contains(
+ .str.lower()
+ .str.contains(
" ".join(
[row[house_no_col], row["Street / Block Name"]]
).lower()
@@ -4474,7 +4479,7 @@ class AssetList:
row[property_type_col].split(" ")[-1].lower()
)
& (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
- ]
+ ]
if df.shape[0] != 1:
# We have multiple matches - it's likely because the landlord has a duplicate
diff --git a/asset_list/app.py b/asset_list/app.py
index 49ec48a0..7413c7cb 100644
--- a/asset_list/app.py
+++ b/asset_list/app.py
@@ -21,6 +21,11 @@ EPC_AUTH_TOKEN = os.getenv(
OPENAI_API_KEY = os.getenv(
"OPENAI_API_KEY",
)
+print(
+ f"[debug] OPENAI_API_KEY loaded: "
+ f"{OPENAI_API_KEY[:8]}...{OPENAI_API_KEY[-4:] if OPENAI_API_KEY else 'NONE'} "
+ f"(len={len(OPENAI_API_KEY) if OPENAI_API_KEY else 0})"
+)
def extract_address1(
@@ -74,23 +79,23 @@ def app():
"""
data_folder = "/workspaces/model/asset_list"
- data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx"
- sheet_name = "in"
- postcode_column = "postcode_clean"
- address1_column = "address2uprn_address"
+ data_filename = "input.xlsx"
+ sheet_name = "Handovers"
+ postcode_column = "POSTCODE"
+ address1_column = "Full Addres"
address1_method = None
- fulladdress_column = "address2uprn_address"
+ fulladdress_column = "Full Addres"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = None
- landlord_os_uprn = "address2uprn_uprn"
- landlord_property_type = "Property Type" # Good to include if landlord gave
- landlord_built_form = "Built Form" # Good to include if landlord gave
+ landlord_os_uprn = "domna_found_uprn"
+ landlord_property_type = "PROPERTY TYPE" # Good to include if landlord gave
+ landlord_built_form = "Type Description" # Good to include if landlord gave
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
- landlord_property_id = "UPRN"
+ landlord_property_id = "PROP REF"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@@ -131,6 +136,7 @@ def app():
landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference,
phase=phase,
+ openai_api_key=OPENAI_API_KEY,
)
asset_list.init_standardise()
@@ -462,3 +468,9 @@ def app():
asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False
)
+
+
+
+
+for key,value in dict.items():
+ lsakjfldsa
\ No newline at end of file
diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py
index 28ad344f..b83c7a58 100644
--- a/backend/address2UPRN/main.py
+++ b/backend/address2UPRN/main.py
@@ -17,16 +17,12 @@ from utils.s3 import (
from datetime import datetime
from backend.utils.addressMatch import AddressMatch
-
-logger = setup_logger()
-
-
-EPC_AUTH_TOKEN = os.getenv(
- "EPC_AUTH_TOKEN",
+from backend.address2UPRN.scoring import ( # noqa: F401 (re-exported)
+ df_has_single_uprn,
+ get_uprn_candidates,
)
-if EPC_AUTH_TOKEN is None:
- raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
+logger = setup_logger()
def score_addresses(
@@ -45,7 +41,10 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts.
"""
- client = EpcClient(auth_token=EPC_AUTH_TOKEN)
+ auth_token = os.getenv("EPC_AUTH_TOKEN")
+ if auth_token is None:
+ raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
+ client = EpcClient(auth_token=auth_token)
url = os.path.join(client.domestic.host, "search")
@@ -88,65 +87,6 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
return results_df
-def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
- """
- Returns True if all non-null UPRNs in df match the given uprn.
- Returns False otherwise.
- """
-
- if column not in df.columns:
- return False
-
- # Drop nulls and normalise to string
- uprns = df[column].dropna().astype(str).str.strip().unique()
-
- # No valid UPRNs to compare
- if len(uprns) == 0:
- return False
-
- # Exactly one unique UPRN and it matches
- return len(uprns) == 1 and uprns[0] == str(uprn)
-
-
-def get_uprn_candidates(
- df: pd.DataFrame,
- user_address: str,
- address_column: str = "address",
- uprn_column: str = "uprn",
-) -> pd.DataFrame:
- """
- Annotate EPC results with lexicographical similarity scores and ranks.
-
- Returns a DataFrame sorted by descending lexiscore.
- DOES NOT choose or return a UPRN.
- """
-
- if address_column not in df.columns:
- raise ValueError(f"Missing column: {address_column}")
-
- if uprn_column not in df.columns:
- raise ValueError(f"Missing column: {uprn_column}")
-
- out = df.copy()
-
- user_norm = AddressMatch.normalise_address(user_address)
-
- out["lexiscore"] = out[address_column].apply(
- lambda x: AddressMatch.levenshtein(user_norm, x)
- )
-
- # Normalise UPRN to string
- out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
-
- # Rank: 1 = best match
- out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
-
- return out.sort_values(
- ["lexirank", "lexiscore"],
- ascending=[True, False],
- )
-
-
def get_uprn_with_epc_df(
user_inputed_address: str,
epc_df: pd.DataFrame,
diff --git a/backend/address2UPRN/scoring.py b/backend/address2UPRN/scoring.py
new file mode 100644
index 00000000..d31b9aea
--- /dev/null
+++ b/backend/address2UPRN/scoring.py
@@ -0,0 +1,57 @@
+import pandas as pd
+
+from backend.utils.addressMatch import AddressMatch
+
+
+def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
+ """
+ Returns True if all non-null UPRNs in df match the given uprn.
+ Returns False otherwise.
+ """
+
+ if column not in df.columns:
+ return False
+
+ uprns = df[column].dropna().astype(str).str.strip().unique()
+
+ if len(uprns) == 0:
+ return False
+
+ return len(uprns) == 1 and uprns[0] == str(uprn)
+
+
+def get_uprn_candidates(
+ df: pd.DataFrame,
+ user_address: str,
+ address_column: str = "address",
+ uprn_column: str = "uprn",
+) -> pd.DataFrame:
+ """
+ Annotate EPC results with lexicographical similarity scores and ranks.
+
+ Returns a DataFrame sorted by descending lexiscore.
+ DOES NOT choose or return a UPRN.
+ """
+
+ if address_column not in df.columns:
+ raise ValueError(f"Missing column: {address_column}")
+
+ if uprn_column not in df.columns:
+ raise ValueError(f"Missing column: {uprn_column}")
+
+ out = df.copy()
+
+ user_norm = AddressMatch.normalise_address(user_address)
+
+ out["lexiscore"] = out[address_column].apply(
+ lambda x: AddressMatch.levenshtein(user_norm, x)
+ )
+
+ out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
+
+ out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
+
+ return out.sort_values(
+ ["lexirank", "lexiscore"],
+ ascending=[True, False],
+ )
diff --git a/backend/app/config.py b/backend/app/config.py
index 21f12902..2a83387b 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -80,6 +80,7 @@ class Settings(BaseSettings):
OSMOSIS_ACD_SHAREPOINT_ID: Optional[str] = None
PRIVATE_PAY_SHAREPOINT_ID: Optional[str] = None
SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID: Optional[str] = None
+ OPENAI_API_KEY: Optional[str] = None
# Pas Hub
PASHUB_EMAIL: Optional[str] = None
diff --git a/backend/app/local/router.py b/backend/app/local/router.py
index 0977be04..ea04dc49 100644
--- a/backend/app/local/router.py
+++ b/backend/app/local/router.py
@@ -2,8 +2,8 @@ from fastapi import APIRouter, HTTPException, status
from jose import jwt, jwe
import json
import datetime
-from app.config import get_settings
-from app.dependencies import get_derived_encryption_key
+from backend.app.config import get_settings
+from backend.app.dependencies import get_derived_encryption_key
router = APIRouter(
prefix="/local",
@@ -27,7 +27,12 @@ def create_dummy_token(secret: str) -> str:
"dbId": "known_id",
}
- token = jwe.encrypt(json.dumps(claims), get_derived_encryption_key(secret), algorithm="dir", encryption="A256GCM")
+ token = jwe.encrypt(
+ json.dumps(claims),
+ get_derived_encryption_key(secret),
+ algorithm="dir",
+ encryption="A256GCM",
+ )
return token
@@ -40,6 +45,8 @@ async def dummy_token():
async def dummy_token():
settings = get_settings()
if settings.ENVIRONMENT != "local":
- raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
- detail="Dummy token can only be generated in local environment")
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="Dummy token can only be generated in local environment",
+ )
return {"dummy_token": create_dummy_token(settings.SECRET_KEY)}
diff --git a/backend/app/main.py b/backend/app/main.py
index c9733c18..55dfef7d 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -30,10 +30,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
- content=jsonable_encoder({
- "detail": exc.errors(),
- "body": exc.body
- }),
+ content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
@@ -63,7 +60,8 @@ app.include_router(tasks_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
- from app.local import router as local_router
+ from backend.app.local import router as local_router
+
app.include_router(local_router.router)
handler = Mangum(app)
@@ -98,10 +96,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
- content=jsonable_encoder({
- "detail": exc.errors(),
- "body": exc.body
- }),
+ content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
@@ -130,7 +125,8 @@ app.include_router(whlg_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
- from app.local import router as local_router
+ from backend.app.local import router as local_router
+
app.include_router(local_router.router)
handler = Mangum(app)
diff --git a/backend/etl/__init__.py b/backend/etl/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/backend/etl/etl_opendatacommunities/README.md b/backend/etl/etl_opendatacommunities/README.md
new file mode 100644
index 00000000..728ac468
--- /dev/null
+++ b/backend/etl/etl_opendatacommunities/README.md
@@ -0,0 +1,14 @@
+This website https://epc.opendatacommunities.org/ has closed down on 30th May 2026
+
+So we downloaded the data and moved everything to S3 ( s3://retrofit-data-dev/histroical_epc/0_master_backup/ )
+
+This scripts assumes the following:
+
+1) You downloaded the master copy, uncompressed it and set it to a path so we can read the csv
+
+
+The script funciton is:
+
+1) reads csv for all data, seperate each iteration by postcode
+2) compresses the csv and save it in the location
+3) location s3://retrofit-data-dev/historical_epc//compressed data.csv
\ No newline at end of file
diff --git a/backend/etl/etl_opendatacommunities/__init__.py b/backend/etl/etl_opendatacommunities/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/backend/etl/etl_opendatacommunities/main.py b/backend/etl/etl_opendatacommunities/main.py
new file mode 100644
index 00000000..2bd41005
--- /dev/null
+++ b/backend/etl/etl_opendatacommunities/main.py
@@ -0,0 +1,133 @@
+from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
+from io import BytesIO
+from pathlib import Path
+from typing import Any
+
+import boto3
+import pandas as pd
+from botocore.config import Config
+from tqdm import tqdm
+
+from utils.logger import setup_logger
+
+logger = setup_logger()
+
+SRC_ROOT = Path("/workspaces/home/epc_data")
+TMP_ROOT = Path("/tmp/epc_postcodes")
+S3_BUCKET = "retrofit-data-dev"
+S3_PREFIX = "historical_epc"
+
+# This scripts assume you downloading the zip, unzip it, and running it locally
+
+
+def sanitise(pc: pd.Series) -> pd.Series:
+ return pc.astype("string").str.upper().str.replace(" ", "", regex=False)
+
+
+def shard_la(la_dir: Path) -> None:
+ certs = pd.read_csv(la_dir / "certificates.csv", low_memory=False)
+
+ certs["POSTCODE_CLEAN"] = sanitise(certs["POSTCODE"])
+ before = len(certs)
+ certs = certs.dropna(subset=["POSTCODE_CLEAN"])
+ certs = certs[certs["POSTCODE_CLEAN"] != ""]
+ dropped = before - len(certs)
+ if dropped:
+ logger.warning(f"{la_dir.name}: dropped {dropped} rows with empty postcode")
+
+ for pc, group in certs.groupby("POSTCODE_CLEAN", sort=False):
+ out = TMP_ROOT / f"{pc}.csv"
+ group.drop(columns=["POSTCODE_CLEAN"]).to_csv(
+ out, mode="a", header=not out.exists(), index=False
+ )
+
+
+def list_existing_keys(s3: Any) -> set[str]:
+ existing: set[str] = set()
+ paginator = s3.get_paginator("list_objects_v2")
+ pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=f"{S3_PREFIX}/")
+ for page in tqdm(pages, desc="list s3"):
+ for obj in page.get("Contents", []):
+ existing.add(obj["Key"])
+ logger.info(f"Found {len(existing)} existing objects under {S3_PREFIX}/")
+ return existing
+
+
+def upload_postcode(path: Path, s3: Any) -> None:
+ df = pd.read_csv(path, low_memory=False).drop_duplicates()
+
+ dupes = df["LMK_KEY"].value_counts()
+ bad = dupes[dupes > 1]
+ if not bad.empty:
+ raise ValueError(
+ f"Postcode {path.stem}: LMK_KEY appears with conflicting cert data: "
+ f"{bad.index.tolist()[:5]}"
+ )
+
+ buf = BytesIO()
+ df.to_csv(buf, index=False, compression="gzip")
+ s3.put_object(
+ Bucket=S3_BUCKET,
+ Key=f"{S3_PREFIX}/{path.stem}/data.csv.gz",
+ Body=buf.getvalue(),
+ ContentType="text/csv",
+ ContentEncoding="gzip",
+ )
+
+
+def main():
+ TMP_ROOT.mkdir(parents=True, exist_ok=True)
+ la_dirs = sorted(
+ p for p in SRC_ROOT.iterdir() if p.is_dir() and p.name.startswith("domestic-")
+ )
+ logger.info(f"Sharding {len(la_dirs)} LA folders -> {TMP_ROOT}")
+
+ for la in tqdm(la_dirs, desc="shard"):
+ shard_la(la)
+
+ s3 = boto3.client(
+ "s3",
+ config=Config(
+ max_pool_connections=512, retries={"max_attempts": 5, "mode": "standard"}
+ ),
+ )
+ pc_files = sorted(TMP_ROOT.glob("*.csv"))
+ logger.info(f"Found {len(pc_files)} local shards")
+
+ existing = list_existing_keys(s3)
+ todo = [p for p in pc_files if f"{S3_PREFIX}/{p.stem}/data.csv.gz" not in existing]
+ skipped = len(pc_files) - len(todo)
+ logger.info(
+ f"Uploading {len(todo)} shards (skipping {skipped} already in S3) -> "
+ f"s3://{S3_BUCKET}/{S3_PREFIX}/"
+ )
+
+ workers = 256
+ todo_iter = iter(todo)
+ inflight: dict[Any, Path] = {}
+ pbar = tqdm(total=len(todo), desc="upload")
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ for _ in range(workers * 2):
+ pc = next(todo_iter, None)
+ if pc is None:
+ break
+ inflight[pool.submit(upload_postcode, pc, s3)] = pc
+
+ while inflight:
+ done, _ = wait(inflight.keys(), return_when=FIRST_COMPLETED)
+ for fut in done:
+ pc = inflight.pop(fut)
+ try:
+ fut.result()
+ except Exception as e:
+ logger.error(f"{pc.name}: {e}")
+ raise
+ pbar.update(1)
+ nxt = next(todo_iter, None)
+ if nxt is not None:
+ inflight[pool.submit(upload_postcode, nxt, s3)] = nxt
+ pbar.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/datatypes/epc/domain/historic_epc.py b/datatypes/epc/domain/historic_epc.py
new file mode 100644
index 00000000..f64ab8c4
--- /dev/null
+++ b/datatypes/epc/domain/historic_epc.py
@@ -0,0 +1,98 @@
+from dataclasses import dataclass
+
+
+@dataclass
+class HistoricEpc:
+ lmk_key: str
+ address1: str
+ address2: str
+ address3: str
+ postcode: str
+ building_reference_number: str
+ current_energy_rating: str
+ potential_energy_rating: str
+ current_energy_efficiency: str
+ potential_energy_efficiency: str
+ property_type: str
+ built_form: str
+ inspection_date: str
+ local_authority: str
+ constituency: str
+ county: str
+ lodgement_date: str
+ transaction_type: str
+ environment_impact_current: str
+ environment_impact_potential: str
+ energy_consumption_current: str
+ energy_consumption_potential: str
+ co2_emissions_current: str
+ co2_emiss_curr_per_floor_area: str
+ co2_emissions_potential: str
+ lighting_cost_current: str
+ lighting_cost_potential: str
+ heating_cost_current: str
+ heating_cost_potential: str
+ hot_water_cost_current: str
+ hot_water_cost_potential: str
+ total_floor_area: str
+ energy_tariff: str
+ mains_gas_flag: str
+ floor_level: str
+ flat_top_storey: str
+ flat_storey_count: str
+ main_heating_controls: str
+ multi_glaze_proportion: str
+ glazed_type: str
+ glazed_area: str
+ extension_count: str
+ number_habitable_rooms: str
+ number_heated_rooms: str
+ low_energy_lighting: str
+ number_open_fireplaces: str
+ hotwater_description: str
+ hot_water_energy_eff: str
+ hot_water_env_eff: str
+ floor_description: str
+ floor_energy_eff: str
+ floor_env_eff: str
+ windows_description: str
+ windows_energy_eff: str
+ windows_env_eff: str
+ walls_description: str
+ walls_energy_eff: str
+ walls_env_eff: str
+ secondheat_description: str
+ sheating_energy_eff: str
+ sheating_env_eff: str
+ roof_description: str
+ roof_energy_eff: str
+ roof_env_eff: str
+ mainheat_description: str
+ mainheat_energy_eff: str
+ mainheat_env_eff: str
+ mainheatcont_description: str
+ mainheatc_energy_eff: str
+ mainheatc_env_eff: str
+ lighting_description: str
+ lighting_energy_eff: str
+ lighting_env_eff: str
+ main_fuel: str
+ wind_turbine_count: str
+ heat_loss_corridor: str
+ unheated_corridor_length: str
+ floor_height: str
+ photo_supply: str
+ solar_water_heating_flag: str
+ mechanical_ventilation: str
+ address: str
+ local_authority_label: str
+ constituency_label: str
+ posttown: str
+ construction_age_band: str
+ lodgement_datetime: str
+ tenure: str
+ fixed_lighting_outlets_count: str
+ low_energy_fixed_light_count: str
+ uprn: str
+ uprn_source: str
+ report_type: str
diff --git a/datatypes/epc/domain/historic_epc_matching.py b/datatypes/epc/domain/historic_epc_matching.py
new file mode 100644
index 00000000..95ca9d9f
--- /dev/null
+++ b/datatypes/epc/domain/historic_epc_matching.py
@@ -0,0 +1,104 @@
+from dataclasses import dataclass
+from typing import Optional
+
+import pandas as pd
+from botocore.exceptions import ClientError
+
+from backend.address2UPRN.scoring import get_uprn_candidates
+from backend.utils.addressMatch import AddressMatch
+from datatypes.epc.domain.historic_epc import HistoricEpc
+from utils.pandas_utils import pandas_cell_to_str
+from utils.s3 import parse_s3_uri, read_csv_gz_from_s3
+
+DEFAULT_S3_ROOT = "s3://retrofit-data-dev/historical_epc"
+
+_EXTRA_COLS = {"lexiscore", "lexirank"}
+
+
+def _map_historic_epc_pandas_row_to_domain(row: pd.Series) -> HistoricEpc:
+ kwargs = {
+ col.lower(): pandas_cell_to_str(val)
+ for col, val in row.items()
+ if col.lower() not in _EXTRA_COLS
+ }
+ return HistoricEpc(**kwargs)
+
+
+@dataclass(frozen=True)
+class ScoredHistoricEpc:
+ record: HistoricEpc
+ lexiscore: float
+ lexirank: int
+
+
+@dataclass
+class HistoricEpcMatches:
+ user_address: str
+ postcode: str
+ matches: list[ScoredHistoricEpc]
+
+ def top(self) -> Optional[ScoredHistoricEpc]:
+ return self.matches[0] if self.matches else None
+
+ def top_n(self, k: int) -> list[ScoredHistoricEpc]:
+ return self.matches[:k]
+
+ def unambiguous_uprn(self) -> Optional[str]:
+ top = self.top()
+ if top is None or top.lexiscore <= 0:
+ return None
+ rank1 = [m for m in self.matches if m.lexirank == top.lexirank]
+ uprns = {m.record.uprn for m in rank1 if m.record.uprn}
+ return next(iter(uprns)) if len(uprns) == 1 else None
+
+
+def _sanitise_postcode(postcode: str) -> str:
+ cleaned = (postcode or "").upper().replace(" ", "")
+ if not cleaned:
+ raise ValueError("postcode must contain non-whitespace characters")
+ if not AddressMatch.is_valid_postcode(cleaned):
+ raise ValueError(f"postcode {cleaned!r} is not a valid UK postcode")
+ return cleaned
+
+
+def match_addresses_for_postcode(
+ user_address: str,
+ postcode: str,
+ *,
+ s3_root: str = DEFAULT_S3_ROOT,
+ address_column: str = "ADDRESS",
+ uprn_column: str = "UPRN",
+) -> HistoricEpcMatches:
+ if not user_address:
+ raise ValueError("user_address must be non-empty")
+
+ pc = _sanitise_postcode(postcode)
+ bucket, root_prefix = parse_s3_uri(s3_root)
+ key = f"{root_prefix.rstrip('/')}/{pc}/data.csv.gz"
+
+ try:
+ df = read_csv_gz_from_s3(bucket, key)
+ except ClientError as e:
+ if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
+ raise FileNotFoundError(
+ f"No historic EPC data at s3://{bucket}/{key}"
+ ) from e
+ raise
+
+ scored = get_uprn_candidates(
+ df,
+ user_address=user_address,
+ address_column=address_column,
+ uprn_column=uprn_column,
+ )
+
+ matches = [
+ ScoredHistoricEpc(
+ record=_map_historic_epc_pandas_row_to_domain(row),
+ lexiscore=float(row["lexiscore"]),
+ lexirank=int(row["lexirank"]),
+ )
+ for _, row in scored.iterrows()
+ ]
+
+ return HistoricEpcMatches(user_address=user_address, postcode=pc, matches=matches)
diff --git a/datatypes/epc/domain/tests/test_historic_epc_matching.py b/datatypes/epc/domain/tests/test_historic_epc_matching.py
new file mode 100644
index 00000000..1c3ee6d4
--- /dev/null
+++ b/datatypes/epc/domain/tests/test_historic_epc_matching.py
@@ -0,0 +1,239 @@
+from unittest.mock import patch
+
+import numpy as np
+import pandas as pd
+import pytest
+from botocore.exceptions import ClientError
+
+from datatypes.epc.domain import historic_epc_matching as matcher_mod
+from datatypes.epc.domain.historic_epc_matching import (
+ HistoricEpcMatches,
+ ScoredHistoricEpc,
+ _sanitise_postcode,
+ match_addresses_for_postcode,
+)
+
+
+# Columns required by the HistoricEpc dataclass (lower-cased CSV columns).
+# The matcher only reads ADDRESS + UPRN to score; everything else is filled
+# with "" but must be present for HistoricEpc(**kwargs) to construct.
+_FULL_COLUMN_FIELDS = [
+ "LMK_KEY", "ADDRESS1", "ADDRESS2", "ADDRESS3", "POSTCODE",
+ "BUILDING_REFERENCE_NUMBER", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING",
+ "CURRENT_ENERGY_EFFICIENCY", "POTENTIAL_ENERGY_EFFICIENCY", "PROPERTY_TYPE",
+ "BUILT_FORM", "INSPECTION_DATE", "LOCAL_AUTHORITY", "CONSTITUENCY", "COUNTY",
+ "LODGEMENT_DATE", "TRANSACTION_TYPE", "ENVIRONMENT_IMPACT_CURRENT",
+ "ENVIRONMENT_IMPACT_POTENTIAL", "ENERGY_CONSUMPTION_CURRENT",
+ "ENERGY_CONSUMPTION_POTENTIAL", "CO2_EMISSIONS_CURRENT",
+ "CO2_EMISS_CURR_PER_FLOOR_AREA", "CO2_EMISSIONS_POTENTIAL",
+ "LIGHTING_COST_CURRENT", "LIGHTING_COST_POTENTIAL", "HEATING_COST_CURRENT",
+ "HEATING_COST_POTENTIAL", "HOT_WATER_COST_CURRENT", "HOT_WATER_COST_POTENTIAL",
+ "TOTAL_FLOOR_AREA", "ENERGY_TARIFF", "MAINS_GAS_FLAG", "FLOOR_LEVEL",
+ "FLAT_TOP_STOREY", "FLAT_STOREY_COUNT", "MAIN_HEATING_CONTROLS",
+ "MULTI_GLAZE_PROPORTION", "GLAZED_TYPE", "GLAZED_AREA", "EXTENSION_COUNT",
+ "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "LOW_ENERGY_LIGHTING",
+ "NUMBER_OPEN_FIREPLACES", "HOTWATER_DESCRIPTION", "HOT_WATER_ENERGY_EFF",
+ "HOT_WATER_ENV_EFF", "FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF",
+ "WINDOWS_DESCRIPTION", "WINDOWS_ENERGY_EFF", "WINDOWS_ENV_EFF",
+ "WALLS_DESCRIPTION", "WALLS_ENERGY_EFF", "WALLS_ENV_EFF",
+ "SECONDHEAT_DESCRIPTION", "SHEATING_ENERGY_EFF", "SHEATING_ENV_EFF",
+ "ROOF_DESCRIPTION", "ROOF_ENERGY_EFF", "ROOF_ENV_EFF", "MAINHEAT_DESCRIPTION",
+ "MAINHEAT_ENERGY_EFF", "MAINHEAT_ENV_EFF", "MAINHEATCONT_DESCRIPTION",
+ "MAINHEATC_ENERGY_EFF", "MAINHEATC_ENV_EFF", "LIGHTING_DESCRIPTION",
+ "LIGHTING_ENERGY_EFF", "LIGHTING_ENV_EFF", "MAIN_FUEL", "WIND_TURBINE_COUNT",
+ "HEAT_LOSS_CORRIDOR", "UNHEATED_CORRIDOR_LENGTH", "FLOOR_HEIGHT",
+ "PHOTO_SUPPLY", "SOLAR_WATER_HEATING_FLAG", "MECHANICAL_VENTILATION",
+ "ADDRESS", "LOCAL_AUTHORITY_LABEL", "CONSTITUENCY_LABEL", "POSTTOWN",
+ "CONSTRUCTION_AGE_BAND", "LODGEMENT_DATETIME", "TENURE",
+ "FIXED_LIGHTING_OUTLETS_COUNT", "LOW_ENERGY_FIXED_LIGHT_COUNT", "UPRN",
+ "UPRN_SOURCE", "REPORT_TYPE",
+]
+
+
+def _row(address: str, uprn) -> dict:
+ row = {col: "" for col in _FULL_COLUMN_FIELDS}
+ row["ADDRESS"] = address
+ row["UPRN"] = uprn
+ return row
+
+
+def _build_df(rows: list[dict]) -> pd.DataFrame:
+ return pd.DataFrame(rows, columns=_FULL_COLUMN_FIELDS)
+
+
+@pytest.fixture
+def patch_postcode_valid():
+ with patch.object(matcher_mod.AddressMatch, "is_valid_postcode", return_value=True) as m:
+ yield m
+
+
+@pytest.fixture
+def patch_read():
+ with patch.object(matcher_mod, "read_csv_gz_from_s3") as m:
+ yield m
+
+
+# ---------- _sanitise_postcode ----------
+
+
+class TestSanitisePostcode:
+
+ def test_uppercases_and_strips_spaces(self, patch_postcode_valid):
+ assert _sanitise_postcode("ab33 8al") == "AB338AL"
+
+ def test_empty_raises(self, patch_postcode_valid):
+ with pytest.raises(ValueError, match="non-whitespace"):
+ _sanitise_postcode("")
+
+ def test_whitespace_only_raises(self, patch_postcode_valid):
+ with pytest.raises(ValueError, match="non-whitespace"):
+ _sanitise_postcode(" ")
+
+ def test_invalid_postcode_raises(self):
+ with patch.object(
+ matcher_mod.AddressMatch, "is_valid_postcode", return_value=False
+ ):
+ with pytest.raises(ValueError, match="not a valid UK postcode"):
+ _sanitise_postcode("NONSENSE")
+
+
+# ---------- match_addresses_for_postcode ----------
+
+
+class TestMatchAddressesForPostcode:
+
+ def test_preserves_row_count_including_zero_score_rows(
+ self, patch_read, patch_postcode_valid
+ ):
+ # Disjoint number sets => hard zero. Still kept in matches.
+ patch_read.return_value = _build_df([
+ _row("47 GORDON ROAD", "100"),
+ _row("999 SOMEWHERE ELSE", "200"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ assert isinstance(result, HistoricEpcMatches)
+ assert len(result.matches) == 2
+
+ def test_top_has_lexirank_one_and_lexiscore_monotone(
+ self, patch_read, patch_postcode_valid
+ ):
+ patch_read.return_value = _build_df([
+ _row("48 GORDON ROAD", "200"), # near miss
+ _row("47 GORDON ROAD", "100"), # exact (after normalisation)
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ assert result.top().lexirank == 1
+ scores = [m.lexiscore for m in result.matches]
+ assert scores == sorted(scores, reverse=True)
+
+ def test_s3_key_built_from_default_root(self, patch_read, patch_postcode_valid):
+ patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
+ match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ patch_read.assert_called_once_with(
+ "retrofit-data-dev", "historical_epc/AB338AL/data.csv.gz"
+ )
+
+ def test_s3_key_respects_custom_root_with_trailing_slash(
+ self, patch_read, patch_postcode_valid
+ ):
+ patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
+ match_addresses_for_postcode(
+ "47 Gordon Road",
+ "AB33 8AL",
+ s3_root="s3://my-bucket/some/prefix/",
+ )
+ patch_read.assert_called_once_with(
+ "my-bucket", "some/prefix/AB338AL/data.csv.gz"
+ )
+
+ def test_no_such_key_translates_to_filenotfound(
+ self, patch_read, patch_postcode_valid
+ ):
+ patch_read.side_effect = ClientError(
+ {"Error": {"Code": "NoSuchKey", "Message": "missing"}}, "GetObject"
+ )
+ with pytest.raises(FileNotFoundError):
+ match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+
+ def test_other_client_error_propagates(self, patch_read, patch_postcode_valid):
+ patch_read.side_effect = ClientError(
+ {"Error": {"Code": "AccessDenied", "Message": "nope"}}, "GetObject"
+ )
+ with pytest.raises(ClientError):
+ match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+
+ def test_empty_user_address_raises(self, patch_postcode_valid):
+ with pytest.raises(ValueError, match="user_address"):
+ match_addresses_for_postcode("", "AB33 8AL")
+
+
+# ---------- unambiguous_uprn ----------
+
+
+class TestUnambiguousUprn:
+
+ def test_exact_match_returns_uprn(self, patch_read, patch_postcode_valid):
+ patch_read.return_value = _build_df([
+ _row("47 GORDON ROAD", "100"),
+ _row("48 GORDON ROAD", "200"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ assert result.unambiguous_uprn() == "100"
+
+ def test_ambiguous_tie_returns_none(self, patch_read, patch_postcode_valid):
+ # Two duplicate addresses with different UPRNs share rank-1.
+ patch_read.return_value = _build_df([
+ _row("47 GORDON ROAD", "100"),
+ _row("47 GORDON ROAD", "200"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ assert result.unambiguous_uprn() is None
+
+ def test_all_zero_score_returns_none_even_when_uprn_unique(
+ self, patch_read, patch_postcode_valid
+ ):
+ # User address has building number 47; no row has 47 -> all hard-zero.
+ patch_read.return_value = _build_df([
+ _row("999 ELSEWHERE", "100"),
+ _row("888 ELSEWHERE", "200"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ assert all(m.lexiscore == 0.0 for m in result.matches)
+ assert result.unambiguous_uprn() is None
+
+ def test_nan_uprn_becomes_empty_string_not_nan(
+ self, patch_read, patch_postcode_valid
+ ):
+ # Use a real NaN in the UPRN cell.
+ patch_read.return_value = _build_df([
+ _row("47 GORDON ROAD", np.nan),
+ _row("48 GORDON ROAD", "200"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ top = result.top()
+ # pandas_cell_to_str must turn NaN/"nan" into "" (not the literal string "nan"),
+ # so unambiguous_uprn's truthiness check correctly drops the row.
+ assert top.record.uprn == ""
+
+
+# ---------- top / top_n ----------
+
+
+class TestTopHelpers:
+
+ def test_top_n_returns_first_k(self, patch_read, patch_postcode_valid):
+ patch_read.return_value = _build_df([
+ _row("47 GORDON ROAD", "100"),
+ _row("48 GORDON ROAD", "200"),
+ _row("49 GORDON ROAD", "300"),
+ ])
+ result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
+ top2 = result.top_n(2)
+ assert len(top2) == 2
+ assert all(isinstance(m, ScoredHistoricEpc) for m in top2)
+
+ def test_top_on_empty_matches_returns_none(self):
+ empty = HistoricEpcMatches(user_address="x", postcode="AB338AL", matches=[])
+ assert empty.top() is None
+ assert empty.top_n(5) == []
+ assert empty.unambiguous_uprn() is None
diff --git a/datatypes/epc/loaders/__init__.py b/datatypes/epc/loaders/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/datatypes/epc/loaders/historic_epc.py b/datatypes/epc/loaders/historic_epc.py
new file mode 100644
index 00000000..a4757d23
--- /dev/null
+++ b/datatypes/epc/loaders/historic_epc.py
@@ -0,0 +1,18 @@
+import csv
+
+from datatypes.epc.domain.historic_epc import HistoricEpc
+
+
+def _normalise(value: str | None) -> str:
+ if value is None:
+ return ""
+ return value.replace("\xa0", " ")
+
+
+def read_historic_epc_csv(path: str) -> list[HistoricEpc]:
+ with open(path, newline="", encoding="utf-8") as f:
+ reader = csv.DictReader(f)
+ return [
+ HistoricEpc(**{k.lower(): _normalise(v) for k, v in row.items()})
+ for row in reader
+ ]
diff --git a/datatypes/epc/schema/tests/test_historic_epc_loading.py b/datatypes/epc/schema/tests/test_historic_epc_loading.py
new file mode 100644
index 00000000..a42f383e
--- /dev/null
+++ b/datatypes/epc/schema/tests/test_historic_epc_loading.py
@@ -0,0 +1,49 @@
+import os
+
+import pytest
+
+from datatypes.epc.loaders.historic_epc import read_historic_epc_csv
+from datatypes.epc.domain.historic_epc import HistoricEpc
+
+FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures")
+
+
+class TestHistoricEpcLoading:
+
+ @pytest.fixture
+ def epc(self) -> HistoricEpc:
+ rows = read_historic_epc_csv(os.path.join(FIXTURES, "historic_epc.csv"))
+ return rows[0]
+
+ def test_returns_historic_epc_instance(self, epc: HistoricEpc) -> None:
+ assert isinstance(epc, HistoricEpc)
+
+ def test_lmk_key(self, epc: HistoricEpc) -> None:
+ assert epc.lmk_key == "9292c3bf26a8876ce59274401ea73e3de5bd0b3e52a507c2162a46e57db8ea2f"
+
+ def test_address1(self, epc: HistoricEpc) -> None:
+ assert epc.address1 == "47 GORDON ROAD"
+
+ def test_postcode(self, epc: HistoricEpc) -> None:
+ assert epc.postcode == "AB33 8AL"
+
+ def test_current_energy_rating(self, epc: HistoricEpc) -> None:
+ assert epc.current_energy_rating == "E"
+
+ def test_property_type(self, epc: HistoricEpc) -> None:
+ assert epc.property_type == "House"
+
+ def test_built_form(self, epc: HistoricEpc) -> None:
+ assert epc.built_form == "Semi-Detached"
+
+ def test_inspection_date(self, epc: HistoricEpc) -> None:
+ assert epc.inspection_date == "2021-04-11"
+
+ def test_uprn(self, epc: HistoricEpc) -> None:
+ assert epc.uprn == "151020766.0"
+
+ def test_uprn_source(self, epc: HistoricEpc) -> None:
+ assert epc.uprn_source == "Energy Assessor"
+
+ def test_report_type(self, epc: HistoricEpc) -> None:
+ assert epc.report_type == "100"
diff --git a/recommendations/Costs.py b/recommendations/Costs.py
index bd8f160a..fc72d4d8 100644
--- a/recommendations/Costs.py
+++ b/recommendations/Costs.py
@@ -21,28 +21,28 @@ regional_labour_variations = [
{"Region": "Yorkshire and the Humber", "Adjustment_Factor": 0.86},
{"Region": "Wales", "Adjustment_Factor": 0.88},
{"Region": "Scotland", "Adjustment_Factor": 0.88},
- {"Region": "Northern Ireland", "Adjustment_Factor": 0.76}
+ {"Region": "Northern Ireland", "Adjustment_Factor": 0.76},
]
# Installers are now working with 435 watt panels
PANEL_SIZE = 0.435
INSTALLER_SOLAR_COSTS = [
- {'n_panels': 4, 'array_kwp': 4 * PANEL_SIZE, 'cost': 4089.25, 'installer': 'CEG'},
- {'n_panels': 5, 'array_kwp': 5 * PANEL_SIZE, 'cost': 4242.48, 'installer': 'CEG'},
- {'n_panels': 6, 'array_kwp': 6 * PANEL_SIZE, 'cost': 4395.71, 'installer': 'CEG'},
- {'n_panels': 7, 'array_kwp': 7 * PANEL_SIZE, 'cost': 4548.94, 'installer': 'CEG'},
- {'n_panels': 8, 'array_kwp': 8 * PANEL_SIZE, 'cost': 4702.17, 'installer': 'CEG'},
- {'n_panels': 9, 'array_kwp': 9 * PANEL_SIZE, 'cost': 4855.41, 'installer': 'CEG'},
- {'n_panels': 10, 'array_kwp': 10 * PANEL_SIZE, 'cost': 5010.95, 'installer': 'CEG'},
- {'n_panels': 11, 'array_kwp': 11 * PANEL_SIZE, 'cost': 5166.49, 'installer': 'CEG'},
- {'n_panels': 12, 'array_kwp': 12 * PANEL_SIZE, 'cost': 5322.04, 'installer': 'CEG'},
- {'n_panels': 13, 'array_kwp': 13 * PANEL_SIZE, 'cost': 5657.6, 'installer': 'CEG'},
- {'n_panels': 14, 'array_kwp': 14 * PANEL_SIZE, 'cost': 5993.16, 'installer': 'CEG'},
- {'n_panels': 15, 'array_kwp': 15 * PANEL_SIZE, 'cost': 6328.71, 'installer': 'CEG'},
- {'n_panels': 16, 'array_kwp': 16 * PANEL_SIZE, 'cost': 6483.33, 'installer': 'CEG'},
- {'n_panels': 17, 'array_kwp': 17 * PANEL_SIZE, 'cost': 6637.95, 'installer': 'CEG'},
- {'n_panels': 18, 'array_kwp': 18 * PANEL_SIZE, 'cost': 6792.57, 'installer': 'CEG'}
+ {"n_panels": 4, "array_kwp": 4 * PANEL_SIZE, "cost": 4089.25, "installer": "CEG"},
+ {"n_panels": 5, "array_kwp": 5 * PANEL_SIZE, "cost": 4242.48, "installer": "CEG"},
+ {"n_panels": 6, "array_kwp": 6 * PANEL_SIZE, "cost": 4395.71, "installer": "CEG"},
+ {"n_panels": 7, "array_kwp": 7 * PANEL_SIZE, "cost": 4548.94, "installer": "CEG"},
+ {"n_panels": 8, "array_kwp": 8 * PANEL_SIZE, "cost": 4702.17, "installer": "CEG"},
+ {"n_panels": 9, "array_kwp": 9 * PANEL_SIZE, "cost": 4855.41, "installer": "CEG"},
+ {"n_panels": 10, "array_kwp": 10 * PANEL_SIZE, "cost": 5010.95, "installer": "CEG"},
+ {"n_panels": 11, "array_kwp": 11 * PANEL_SIZE, "cost": 5166.49, "installer": "CEG"},
+ {"n_panels": 12, "array_kwp": 12 * PANEL_SIZE, "cost": 5322.04, "installer": "CEG"},
+ {"n_panels": 13, "array_kwp": 13 * PANEL_SIZE, "cost": 5657.6, "installer": "CEG"},
+ {"n_panels": 14, "array_kwp": 14 * PANEL_SIZE, "cost": 5993.16, "installer": "CEG"},
+ {"n_panels": 15, "array_kwp": 15 * PANEL_SIZE, "cost": 6328.71, "installer": "CEG"},
+ {"n_panels": 16, "array_kwp": 16 * PANEL_SIZE, "cost": 6483.33, "installer": "CEG"},
+ {"n_panels": 17, "array_kwp": 17 * PANEL_SIZE, "cost": 6637.95, "installer": "CEG"},
+ {"n_panels": 18, "array_kwp": 18 * PANEL_SIZE, "cost": 6792.57, "installer": "CEG"},
]
# These are costs we received from CRG, for pricing up air source heat pumps
@@ -80,7 +80,12 @@ INSTALLER_SOLAR_PV_INVERTER_COST = 7500
INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs
INSTALLER_SOLAR_BATTERY_COSTS = [
- {'capacity_kwh': 5, 'description': 'Battery Add on', 'cost': 3769.89, 'installer': 'JJC'},
+ {
+ "capacity_kwh": 5,
+ "description": "Battery Add on",
+ "cost": 3769.89,
+ "installer": "JJC",
+ },
# {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'},
# {'capacity_kwh': 5, 'description': 'Battery Retrofit existing system', 'cost': 4250.00, 'installer': 'CEG'},
# {'capacity_kwh': 10, 'description': 'Battery Retrofit Existing system', 'cost': 5950.00, 'installer': 'CEG'}
@@ -102,10 +107,14 @@ TTZC_SMART_THERMOSTAT_LABOUR_HOURS = 2
TTZC_ELECTRICIAN_HOURLY_RATE = 45
# Based on cost of a Nest temperature sensor
TTZC_ROOM_TEMPERATURE_SENSOR_COST = 50
-TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = 0.17 # (Assume ~ 10 mins install per sensor)
+TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = (
+ 0.17 # (Assume ~ 10 mins install per sensor)
+)
# Basedon an average cost of smart radiator values
TTZC_SMART_RADIATOR_VALUES = 50
-TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = 0.37 # (Assume ~ 15-30 mins install per valve)
+TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = (
+ 0.37 # (Assume ~ 15-30 mins install per valve)
+)
# boiler prices based on
# This is the cost of a firs time central heating install from The Warm Front rate card
@@ -169,7 +178,7 @@ class Costs:
"heater_removal": 0.1,
"sealing_open_fireplace": 0.1,
"mechanical_ventilation": 0.26,
- "sloping_ceiling_insulation": 0.26 # Similar to IWI so using the same contingency
+ "sloping_ceiling_insulation": 0.26, # Similar to IWI so using the same contingency
}
# Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs
@@ -195,36 +204,46 @@ class Costs:
:param property_instance: Instance of a Property class containing relevant details like wall area.
"""
- if not hasattr(property_instance, 'insulation_wall_area'):
- raise ValueError("Property instance must have an 'insulation_wall_area' attribute")
+ if not hasattr(property_instance, "insulation_wall_area"):
+ raise ValueError(
+ "Property instance must have an 'insulation_wall_area' attribute"
+ )
self.property = property_instance
self.regional_labour_variations = regional_labour_variations
self.region = county_to_region_map.get(self.property.epc_record.county, None)
if self.region is None:
# Try and grab using the local-authority-label
- self.region = county_to_region_map.get(self.property.epc_record.local_authority_label, None)
+ self.region = county_to_region_map.get(
+ self.property.epc_record.local_authority_label, None
+ )
if self.region is None:
# Try and get the region after converting the keys to lower
- self.region = {
- k.lower(): v for k, v in county_to_region_map.items()
- }.get(self.property.epc_record.local_authority_label.lower(), None)
+ if self.property.epc_record.local_authority_label is not None:
+ self.region = {
+ k.lower(): v for k, v in county_to_region_map.items()
+ }.get(self.property.epc_record.local_authority_label.lower(), None)
if self.region is None:
- logger.warning("No region found for county %s, defaulting to South East England",
- self.property.epc_record.county)
+ logger.warning(
+ "No region found for county %s, defaulting to South East England",
+ self.property.epc_record.county,
+ )
self.region = "South East England"
self.labour_adjustment_factor = [
- x["Adjustment_Factor"] for x in self.regional_labour_variations if
- x["Region"] == self.region
+ x["Adjustment_Factor"]
+ for x in self.regional_labour_variations
+ if x["Region"] == self.region
][0]
if not self.labour_adjustment_factor:
raise ValueError("Labour adjustment factor not found")
- def cavity_wall_insulation(self, wall_area, material, is_extraction_and_refill=False):
+ def cavity_wall_insulation(
+ self, wall_area, material, is_extraction_and_refill=False
+ ):
"""
Calculates the total cost for cavity wall insulation based on material and labor costs,
including contingency, preliminaries, profit, and VAT.
@@ -318,7 +337,8 @@ class Costs:
return {
"total": total_cost,
- "contingency": self.CONTINGENCIES["suspended_floor_insulation"] * total_cost,
+ "contingency": self.CONTINGENCIES["suspended_floor_insulation"]
+ * total_cost,
"contingency_rate": self.CONTINGENCIES["suspended_floor_insulation"],
"labour_hours": labour_hours,
"labour_days": labour_days,
@@ -370,8 +390,7 @@ class Costs:
# - Apply sub-linear scaling for realism
# - Enforce a minimum duration so estimates are not unrealistically low
labour_days = max(
- min_days,
- base_days * (insulation_floor_area / base_area) ** labour_exponent
+ min_days, base_days * (insulation_floor_area / base_area) ** labour_exponent
)
return labour_days
@@ -388,7 +407,9 @@ class Costs:
total_cost = material["total_cost"] * insulation_floor_area
daily_labour_rate = 300 # Based on checkatrade
- labour_days = self._estimate_number_of_days_for_solid_floor(insulation_floor_area)
+ labour_days = self._estimate_number_of_days_for_solid_floor(
+ insulation_floor_area
+ )
labour_cost = labour_days * daily_labour_rate
total_cost = total_cost + labour_cost
@@ -404,7 +425,6 @@ class Costs:
}
def low_energy_lighting(self, number_of_lights, material):
-
"""
Calculates the total cost for low energy lighting based on material and labor costs,
including contingency, preliminaries, profit, and VAT.
@@ -419,7 +439,7 @@ class Costs:
total_cost = material["total_cost"] * number_of_lights
labour_hours = 1
- labour_days = (labour_hours / 8)
+ labour_days = labour_hours / 8
return {
"total": total_cost,
@@ -450,26 +470,22 @@ class Costs:
}
@classmethod
- def solar_pv(
- cls,
- solar_product,
- scaffolding_options,
- n_floors
- ):
-
- """
-
- """
+ def solar_pv(cls, solar_product, scaffolding_options, n_floors):
+ """ """
system_cost = solar_product["total_cost"]
if not solar_product["includes_scaffolding"]:
# We base this on the number of floors
- scaffolding = [x["total_cost"] for x in scaffolding_options if x["size"] == n_floors]
+ scaffolding = [
+ x["total_cost"] for x in scaffolding_options if x["size"] == n_floors
+ ]
if not scaffolding:
# If we have no options, handle this
if n_floors <= 3:
- raise ValueError("No scaffolding options available for 3 or fewer floors")
+ raise ValueError(
+ "No scaffolding options available for 3 or fewer floors"
+ )
# We take the largest scaffolding option available
scaffolding_cost = max([x["total_cost"] for x in scaffolding_options])
else:
@@ -523,9 +539,9 @@ class Costs:
We base the estimates for the cost of electric room heaters on the cost per room as estimated by the
following article:
https://www.bestelectricradiators.co.uk/blog/cost-to-install-a-new-heating-system-uk/
-
+
:param number_heated_rooms: int, number of rooms to be heated
- :return:
+ :return:
"""
total_cost = 500 * number_heated_rooms
@@ -547,11 +563,11 @@ class Costs:
}
def high_heat_electric_storage_heaters(
- self, number_heated_rooms: int,
+ self,
+ number_heated_rooms: int,
needs_cylinder: bool,
- product: dict | None = None
+ product: dict | None = None,
):
-
"""
We base the estimates for the cost of electric storage heaters on the cost per room as estimated by the
energy saving trust
@@ -578,8 +594,11 @@ class Costs:
return {
"total": total_cost,
- "contingency": total_cost * self.CONTINGENCIES["high_heat_retention_storage_heaters"],
- "contingency_rate": self.CONTINGENCIES["high_heat_retention_storage_heaters"],
+ "contingency": total_cost
+ * self.CONTINGENCIES["high_heat_retention_storage_heaters"],
+ "contingency_rate": self.CONTINGENCIES[
+ "high_heat_retention_storage_heaters"
+ ],
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@@ -690,14 +709,14 @@ class Costs:
# The product costs are inclusive of VAT
product_costs = (
- TTZC_SMART_THERMOSTAT_COST +
- TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms +
- TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
+ TTZC_SMART_THERMOSTAT_COST
+ + TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms
+ + TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
)
labour_hours = (
- TTZC_SMART_THERMOSTAT_LABOUR_HOURS +
- TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms +
- TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
+ TTZC_SMART_THERMOSTAT_LABOUR_HOURS
+ + TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms
+ + TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
)
labour_costs = TTZC_ELECTRICIAN_HOURLY_RATE * labour_hours
# Add continency and preliminaries to the labour to account for the complexity of the job
@@ -722,7 +741,9 @@ class Costs:
"labour_days": labour_days,
}
- def programmer_trvs_bypass(self, number_heated_rooms, has_programmer, has_trvs, has_bypass):
+ def programmer_trvs_bypass(
+ self, number_heated_rooms, has_programmer, has_trvs, has_bypass
+ ):
total_cost = 0
labour_hours = 0
@@ -779,7 +800,9 @@ class Costs:
}
@staticmethod
- def _estimate_n_radiators(number_habitable_rooms, total_floor_area, property_type, built_form):
+ def _estimate_n_radiators(
+ number_habitable_rooms, total_floor_area, property_type, built_form
+ ):
# Base number of radiators: one per habitable room
base_radiators = number_habitable_rooms
@@ -787,34 +810,49 @@ class Costs:
additional_radiators = 3 # Initial assumption
# Adjust additional radiators based on property type
- if property_type == 'Flat':
- additional_radiators -= 1 # Flats may need fewer radiators due to less exposure
- elif property_type in ['House', 'Bungalow', 'Maisonette']:
+ if property_type == "Flat":
+ additional_radiators -= (
+ 1 # Flats may need fewer radiators due to less exposure
+ )
+ elif property_type in ["House", "Bungalow", "Maisonette"]:
# Multiple floors in Maisonette may require additional heating points
- additional_radiators += 2 # Houses and bungalows might need more due to greater exposure
+ additional_radiators += (
+ 2 # Houses and bungalows might need more due to greater exposure
+ )
else:
raise Exception("Invalid property type")
# Adjust total radiator needs based on built form
form_factor = {
- 'Enclosed Mid-Terrace': 0.9,
- 'Mid-Terrace': 0.95,
- 'Enclosed End-Terrace': 0.95,
- 'Semi-Detached': 1.05,
- 'Detached': 1.25,
- 'End-Terrace': 1.05
+ "Enclosed Mid-Terrace": 0.9,
+ "Mid-Terrace": 0.95,
+ "Enclosed End-Terrace": 0.95,
+ "Semi-Detached": 1.05,
+ "Detached": 1.25,
+ "End-Terrace": 1.05,
}
# Calculate total heating power needed and number of radiators based on standard output
total_heating_power_required = total_floor_area * 80 # Watts per square meter
radiator_output = 1000 # Average wattage per radiator
- total_radiators_based_on_power = (total_heating_power_required / radiator_output) * form_factor[built_form]
+ total_radiators_based_on_power = (
+ total_heating_power_required / radiator_output
+ ) * form_factor[built_form]
# Final estimation taking the higher of calculated needs or base room count
- estimated_radiators = max(total_radiators_based_on_power, base_radiators + additional_radiators)
+ estimated_radiators = max(
+ total_radiators_based_on_power, base_radiators + additional_radiators
+ )
return round(estimated_radiators)
- def boiler(self, exising_room_heaters, system_change, n_heated_rooms, n_rooms, is_electric=False):
+ def boiler(
+ self,
+ exising_room_heaters,
+ system_change,
+ n_heated_rooms,
+ n_rooms,
+ is_electric=False,
+ ):
"""
Based on a basic estimate of median value £2600 to install a low carbon combi boiler
First time central heating vosts can als be found here:
@@ -859,12 +897,14 @@ class Costs:
number_habitable_rooms=n_rooms,
total_floor_area=self.property.floor_area,
property_type=self.property.epc_record.property_type,
- built_form=self.property.epc_record.built_form
+ built_form=self.property.epc_record.built_form,
)
additionals_labour_cost = labour_rate * self.labour_adjustment_factor
radiator_cost = DOUBLE_RADIATOR_COST * n_radiators
- system_change_cost = radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost
+ system_change_cost = (
+ radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost
+ )
system_change_cost_before_vat = system_change_cost / (1 + self.VAT_RATE)
system_change_vat = system_change_cost - system_change_cost_before_vat
# We add an extra labour day for the system change
@@ -897,14 +937,18 @@ class Costs:
else:
return 250
- def air_source_heat_pump(self, ashp_size: float, number_heated_rooms: int, total_floor_area: float) -> dict:
+ def air_source_heat_pump(
+ self, ashp_size: float, number_heated_rooms: int, total_floor_area: float
+ ) -> dict:
"""
We produce a cost estimation for an air source heat pump, based on costs we have received from installers.
"""
system_cost = (
- (ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST) + ASHP_SECURITY + ASHP_WALL_BRACKET
+ (ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST)
+ + ASHP_SECURITY
+ + ASHP_WALL_BRACKET
)
available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS]
@@ -940,7 +984,9 @@ class Costs:
}
@staticmethod
- def _estimate_number_of_days_for_sloping_ceiling(insulation_roof_area: float) -> float:
+ def _estimate_number_of_days_for_sloping_ceiling(
+ insulation_roof_area: float,
+ ) -> float:
"""
Estimate labour days required to insulate an existing sloping ceiling.
@@ -965,14 +1011,15 @@ class Costs:
min_days = 2
labour_days = max(
- min_days,
- base_days * (insulation_roof_area / base_area) ** labour_exponent
+ min_days, base_days * (insulation_roof_area / base_area) ** labour_exponent
)
return labour_days
@classmethod
- def sloping_ceiling_insulation(cls, insulation_roof_area: float) -> Mapping[str, float]:
+ def sloping_ceiling_insulation(
+ cls, insulation_roof_area: float
+ ) -> Mapping[str, float]:
"""
This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes.
:param insulation_roof_area: Area of the sloping ceiling to be insulated
@@ -985,14 +1032,20 @@ class Costs:
# https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/
# https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/
# These assumptions last updated 21/02/2026
- insulation_cost_per_m2 = 52 # The actual install process is quite similar to IWI
+ insulation_cost_per_m2 = (
+ 52 # The actual install process is quite similar to IWI
+ )
labour_rate = 250 # per day
contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"]
- labour_days = cls._estimate_number_of_days_for_sloping_ceiling(insulation_roof_area)
+ labour_days = cls._estimate_number_of_days_for_sloping_ceiling(
+ insulation_roof_area
+ )
labour_hours = labour_days * 8
- total = (insulation_cost_per_m2 * insulation_roof_area) + (labour_rate * labour_days)
+ total = (insulation_cost_per_m2 * insulation_roof_area) + (
+ labour_rate * labour_days
+ )
# Assume VAT included in the total => total is 120% of subtotal
vat = total - (total / 1.2)
diff --git a/scripts/historic_epc_demo.py b/scripts/historic_epc_demo.py
new file mode 100644
index 00000000..b47c3a3c
--- /dev/null
+++ b/scripts/historic_epc_demo.py
@@ -0,0 +1,47 @@
+"""Demo: look up historic EPC records for an address + postcode.
+
+Reads the gzipped CSV at
+ s3://retrofit-data-dev/historical_epc//data.csv.gz
+scores rows against the user-provided address, and prints the top matches.
+
+Usage:
+ python -m scripts.historic_epc_demo "47 Gordon Road" "AB33 8AL"
+ python -m scripts.historic_epc_demo # uses defaults below
+"""
+
+import sys
+
+from datatypes.epc.domain.historic_epc_matching import match_addresses_for_postcode
+
+
+def main(user_address: str, postcode: str) -> None:
+ print(f"Looking up: {user_address!r} @ {postcode!r}\n")
+
+ result = match_addresses_for_postcode(user_address, postcode)
+
+ print(f"Found {len(result.matches)} candidate row(s).\n")
+
+ print("Top 3 matches:")
+ for m in result.top_n(3):
+ print(
+ f" rank={m.lexirank} score={m.lexiscore:.3f} "
+ f"uprn={m.record.uprn or '(none)':<14} {m.record.address}"
+ )
+
+ print()
+ uprn = result.unambiguous_uprn()
+ if uprn:
+ print(f"Unambiguous UPRN: {uprn}")
+ else:
+ print("No unambiguous UPRN (zero-score, tie, or empty result).")
+
+
+if __name__ == "__main__":
+ args = sys.argv[1:]
+ if len(args) == 2:
+ main(args[0], args[1])
+ elif len(args) == 0:
+ main("47 Gordon Road", "AB33 8AL")
+ else:
+ print(__doc__)
+ sys.exit(2)
diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py
index b275086d..5e3ce5d5 100644
--- a/sfr/principal_pitch/2_export_data.py
+++ b/sfr/principal_pitch/2_export_data.py
@@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict
from sqlalchemy import func
-PORTFOLIO_ID = 711
-SCENARIOS = [1233]
+PORTFOLIO_ID = 632
+SCENARIOS = [1144]
scenario_names = {
- 1233: "Reach EPC C",
+ 1144: "EPC C",
}
-project_name = "Novus"
+project_name = "Calico Refresh"
def get_data(portfolio_id, scenario_ids):
diff --git a/utils/pandas_utils.py b/utils/pandas_utils.py
new file mode 100644
index 00000000..b32cde10
--- /dev/null
+++ b/utils/pandas_utils.py
@@ -0,0 +1,14 @@
+from typing import Any
+
+import pandas as pd
+
+
+def pandas_cell_to_str(v: Any) -> str:
+ if v is None or (isinstance(v, float) and pd.isna(v)):
+ return ""
+ s = str(v).replace("\xa0", " ")
+ # get_uprn_candidates runs .astype(str) on UPRN, turning NaN into "nan".
+ # Treat that as missing so unambiguous_uprn truthiness checks work.
+ if s.lower() == "nan":
+ return ""
+ return s
diff --git a/utils/s3.py b/utils/s3.py
index 930e2e15..13d272e7 100644
--- a/utils/s3.py
+++ b/utils/s3.py
@@ -6,8 +6,6 @@ from io import BytesIO, StringIO
from urllib.parse import unquote
from utils.logger import setup_logger
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
-from typing import Any
-
logger = setup_logger()
@@ -167,6 +165,21 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key):
return df
+def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame:
+ """
+ Read a gzipped CSV from S3 into a pandas DataFrame.
+
+ :param bucket_name: Name of the S3 bucket.
+ :param file_key: Key of the file (must end in .csv.gz).
+ :return: A pandas DataFrame.
+ """
+ if not file_key.endswith(".csv.gz"):
+ raise ValueError("file_key must end with .csv.gz")
+
+ buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key)
+ return pd.read_csv(buffer, compression="gzip", low_memory=False)
+
+
def save_csv_to_s3(dataframe, bucket_name, file_name):
"""
Save a Pandas DataFrame to a CSV file in an S3 bucket.