Merge branch 'main' into feature/magicplan-trigger

This commit is contained in:
Daniel Roth 2026-05-11 09:06:47 +00:00
commit 6fd95d20af
22 changed files with 1145 additions and 345 deletions

View file

@ -31,17 +31,19 @@ from recommendations.recommendation_utils import (
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from dotenv import load_dotenv # from dotenv import load_dotenv
logger = setup_logger() logger = setup_logger()
load_dotenv(dotenv_path="../backend/.env") # load_dotenv(dotenv_path="../backend/.env")
# OpenAI API Key (set this in your environment variables for security) # OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper: class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000): def __init__(
self, standard_values, standard_map=None, max_tokens=1000, api_key=None
):
""" """
Initialize the remapper with standard values and a predefined mapping. Initialize the remapper with standard values and a predefined mapping.
@ -75,7 +77,8 @@ class DataRemapper:
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000}, "gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
} }
self.openai_client = OpenAI(api_key=OPENAI_API_KEY) print(f"DATA REMAPPER api key is {api_key}")
self.openai_client = OpenAI(api_key=api_key)
@staticmethod @staticmethod
def clean_string(text): def clean_string(text):
@ -136,12 +139,20 @@ class DataRemapper:
raise ValueError("Input tokens exceed the maximum limit.") raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...") logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model, try:
messages=[{"role": "user", "content": prompt}], response = self.openai_client.chat.completions.create(
max_tokens=self.max_tokens, model=self.ai_model,
temperature=0.1, messages=[{"role": "user", "content": prompt}],
) max_tokens=self.max_tokens,
temperature=0.1,
)
except Exception as e:
print(f"[debug] OpenAI call failed. type={type(e).__name__}")
print(f"[debug] status={getattr(e, 'status_code', None)}")
print(f"[debug] body={getattr(e, 'response', None) and e.response.text}")
print(f"[debug] model={self.ai_model}")
raise
output_text = response.choices[0].message.content.strip() output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens output_tokens = self.count_tokens(output_text) # Count output tokens
@ -504,6 +515,7 @@ class AssetList:
landlord_block_reference=None, landlord_block_reference=None,
phase=False, phase=False,
header=0, header=0,
openai_api_key=None,
): ):
self.local_filepath = local_filepath self.local_filepath = local_filepath
self.sheet_name = sheet_name self.sheet_name = sheet_name
@ -529,6 +541,7 @@ class AssetList:
self.ecosurv = None self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame() self.ecosurv_no_match = pd.DataFrame()
self.geographical_areas = pd.DataFrame() self.geographical_areas = pd.DataFrame()
self.openai_api_key = openai_api_key
# When this is True, we intend to break the programme into multiple phases. We may need to review # When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to # how this is structured in the future, as depending on how we get future data, we may need to
@ -1107,6 +1120,7 @@ class AssetList:
remapper = DataRemapper( remapper = DataRemapper(
standard_values=config["standard_values"], standard_values=config["standard_values"],
standard_map=config["standard_map"], standard_map=config["standard_map"],
api_key=self.openai_api_key,
) )
remap_dictionary = remapper.standardize_list( remap_dictionary = remapper.standardize_list(
values_to_remap=values_to_remap.tolist() values_to_remap=values_to_remap.tolist()
@ -1296,8 +1310,8 @@ class AssetList:
self.standardised_asset_list[ self.standardised_asset_list[
self.ATTRIBUTE_HAS_SOLAR self.ATTRIBUTE_HAS_SOLAR
] = self.standardised_asset_list[ ] = self.standardised_asset_list[
self.FIND_EPC_DATA_NAMES["Solar photovoltaics"] self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
] | ~self.standardised_asset_list[ ] | ~self.standardised_asset_list[
self.EPC_API_DATA_NAMES["photo-supply"] self.EPC_API_DATA_NAMES["photo-supply"]
].isin( ].isin(
["0.0", 0, None, "", np.nan] ["0.0", 0, None, "", np.nan]
@ -1315,7 +1329,7 @@ class AssetList:
property_type=( property_type=(
str(x[self.STANDARD_PROPERTY_TYPE]).title() str(x[self.STANDARD_PROPERTY_TYPE]).title()
if str(x[self.STANDARD_PROPERTY_TYPE]).title() if str(x[self.STANDARD_PROPERTY_TYPE]).title()
in accepted_epc_property_types in accepted_epc_property_types
else ( else (
x[self.EPC_API_DATA_NAMES["property-type"]] x[self.EPC_API_DATA_NAMES["property-type"]]
if not pd.isnull( if not pd.isnull(
@ -1373,9 +1387,9 @@ class AssetList:
self.standardised_asset_list.apply( self.standardised_asset_list.apply(
lambda x: estimate_perimeter( lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]]
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS], / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]]
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS], / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
), ),
axis=1, axis=1,
) )
@ -1460,7 +1474,7 @@ class AssetList:
year_lower_bound = ( year_lower_bound = (
2007 2007
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] if x[self.EPC_API_DATA_NAMES["construction-age-band"]]
== "England and Wales: 2007 onwards" == "England and Wales: 2007 onwards"
else 2012 else 2012
) )
@ -1515,7 +1529,7 @@ class AssetList:
age_band_matches = ( age_band_matches = (
"EPC Age Band Matches Year Built" "EPC Age Band Matches Year Built"
if x[self.STANDARD_YEAR_BUILT] if x[self.STANDARD_YEAR_BUILT]
== int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) == int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
else "EPC Age Band is different from Year Built" else "EPC Age Band is different from Year Built"
) )
@ -1545,7 +1559,7 @@ class AssetList:
age_band_matches = ( age_band_matches = (
"EPC Age Band Matches Year Built" "EPC Age Band Matches Year Built"
if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date))
and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date)) and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
else ( else (
"EPC Age Band is older than Year Built" "EPC Age Band is older than Year Built"
if x[self.STANDARD_YEAR_BUILT] > float(upper_date) if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
@ -1717,22 +1731,22 @@ class AssetList:
if self.non_intrusives_present: if self.non_intrusives_present:
if self.new_format_non_insturives_present_v2: if self.new_format_non_insturives_present_v2:
non_intrusives_wall_filter = ( non_intrusives_wall_filter = (
self.standardised_asset_list["non-intrusives: Construction"] self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY" == "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin( ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL", "EMPTY CAVITY"] ["EMPTY", "PARTIAL", "EMPTY CAVITY"]
) )
else: else:
non_intrusives_wall_filter = ( non_intrusives_wall_filter = (
self.standardised_asset_list["non-intrusives: Construction"] self.standardised_asset_list["non-intrusives: Construction"]
== "CAVITY" == "CAVITY"
) & self.standardised_asset_list["non-intrusives: Insulated"].isin( ) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EMPTY", "PARTIAL"] ["EMPTY", "PARTIAL"]
) )
elif self.old_format_non_intrusives_present: elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = self.standardised_asset_list[ non_intrusives_wall_filter = self.standardised_asset_list[
"non-intrusives: WFT Findings" "non-intrusives: WFT Findings"
].str.lower().str.strip().isin( ].str.lower().str.strip().isin(
[ [
"empty cavity", "empty cavity",
"partial fill", "partial fill",
@ -1742,18 +1756,18 @@ class AssetList:
"empty cav", "empty cav",
] ]
) | ( ) | (
( (
self.standardised_asset_list["non-intrusives: WFT Findings"] self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower() .str.lower()
.str.strip() .str.strip()
.str.contains("empty cavity|partial fill") .str.contains("empty cavity|partial fill")
& ~self.standardised_asset_list["non-intrusives: WFT Findings"] & ~self.standardised_asset_list["non-intrusives: WFT Findings"]
.astype(str) .astype(str)
.str.lower() .str.lower()
.str.strip() .str.strip()
.str.contains("major access issues") .str.contains("major access issues")
) )
) )
else: else:
# We set the filter to False, as we have no non-intrusives # We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False non_intrusives_wall_filter = False
@ -1765,12 +1779,12 @@ class AssetList:
) )
else: else:
year_built_filter = ( year_built_filter = (
self.standardised_asset_list[self.STANDARD_YEAR_BUILT] self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) | ( ) | (
self.standardised_asset_list["epc_year_upper_bound"] self.standardised_asset_list["epc_year_upper_bound"]
<= self.EMPTY_CAVITY_YEAR_THRESHOLD <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) )
# Criteria: # Criteria:
# The property isn't a bedsit # The property isn't a bedsit
@ -1811,8 +1825,8 @@ class AssetList:
] = ( ] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity_has_solar" "non_intrusive_indicates_empty_cavity_has_solar"
] ]
& ( & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
["bedsit"] ["bedsit"]
@ -1888,8 +1902,8 @@ class AssetList:
.str.lower() .str.lower()
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS) .isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
| self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( | self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["uninsulated cavity"] ["uninsulated cavity"]
) )
) )
###################################################### ######################################################
@ -1926,8 +1940,8 @@ class AssetList:
extraction_wall_filter = ( extraction_wall_filter = (
extraction_wall_filter extraction_wall_filter
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non-intrusives: Eligibility (Red/Yellow/Green)" "non-intrusives: Eligibility (Red/Yellow/Green)"
].isin(["RED"]) ].isin(["RED"])
) )
self.standardised_asset_list[ self.standardised_asset_list[
@ -2023,26 +2037,26 @@ class AssetList:
self.standardised_asset_list[ self.standardised_asset_list[
"solar_epc_data_indicates_correct_heating_system" "solar_epc_data_indicates_correct_heating_system"
] = ( ] = (
self.standardised_asset_list[ self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"] self.EPC_API_DATA_NAMES["mainheat-description"]
] ]
.str.lower() .str.lower()
.str.contains( .str.contains(
"air source heat pump|ground source heat pump|boiler and radiators, electric" "air source heat pump|ground source heat pump|boiler and radiators, electric"
)
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
== "Controls for high heat retention storage heaters"
)
) )
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
== "Controls for high heat retention storage heaters"
)
)
# If the landlord has given us the heating system, we default to that on heating upgrades. Because of the # If the landlord has given us the heating system, we default to that on heating upgrades. Because of the
# poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the # poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the
@ -2050,25 +2064,25 @@ class AssetList:
self.standardised_asset_list[ self.standardised_asset_list[
"solar_epc_data_indicates_requires_heating_upgrade" "solar_epc_data_indicates_requires_heating_upgrade"
] = ( ] = (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]
]
.str.lower()
.str.contains("electric storage heaters|room heaters")
& (
self.standardised_asset_list[ self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"] self.EPC_API_DATA_NAMES["mainheatcont-description"]
] ]
.str.lower() != "Controls for high heat retention storage heaters"
.str.contains("electric storage heaters|room heaters")
& (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
]
!= "Controls for high heat retention storage heaters"
)
) & (
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["district heating", "communal heating", "communal gas boiler"]
)
& ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
.astype(str)
.str.contains("gas ")
) )
) & (
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["district heating", "communal heating", "communal gas boiler"]
)
& ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
.astype(str)
.str.contains("gas ")
)
# Basic check - both of the previous two shouldn't be true simultaneously # Basic check - both of the previous two shouldn't be true simultaneously
if ( if (
@ -2148,8 +2162,8 @@ class AssetList:
self.standardised_asset_list[ self.standardised_asset_list[
"solar_non_intrusives_walls_insulated" "solar_non_intrusives_walls_insulated"
] = self.standardised_asset_list[ ] = self.standardised_asset_list[
"non-intrusives: WFT Findings" "non-intrusives: WFT Findings"
].str.lower().str.strip().isin( ].str.lower().str.strip().isin(
[ [
"retro drilled", "retro drilled",
"retro filled", "retro filled",
@ -2158,8 +2172,8 @@ class AssetList:
"retro drilled and filled", "retro drilled and filled",
] ]
) | self.standardised_asset_list[ ) | self.standardised_asset_list[
"non-intrusives: WFT Findings" "non-intrusives: WFT Findings"
].str.lower().str.strip().str.contains( ].str.lower().str.strip().str.contains(
"retro drilled" "retro drilled"
) )
else: else:
@ -2176,19 +2190,14 @@ class AssetList:
) )
self.standardised_asset_list["solar_epc_walls_insulated"] = ( self.standardised_asset_list["solar_epc_walls_insulated"] = (
self.standardised_asset_list[ self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]]
self.EPC_API_DATA_NAMES[ .str.lower()
"walls-description"]] .str.contains("|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS))
.str.lower() ) | (
.str.contains("|".join( self.standardised_asset_list["walls_u_value"].apply(
self.EPC_INSULATED_WALLS_SUBSTRINGS)) lambda x: x <= 0.7 if not pd.isnull(x) else False
) | ( )
self.standardised_asset_list[ )
"walls_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(
x) else False
)
)
roof_data = [] roof_data = []
for desc in self.standardised_asset_list[ for desc in self.standardised_asset_list[
@ -2230,20 +2239,20 @@ class AssetList:
self.standardised_asset_list[ self.standardised_asset_list[
"solar_epc_loft_needs_topup" "solar_epc_loft_needs_topup"
] = self.standardised_asset_list[ ] = self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].apply( ].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False lambda x: int(x) < 200 if str(x).isdigit() else False
) | ( ) | (
( (
self.standardised_asset_list["is_loft"] self.standardised_asset_list["is_loft"]
| self.standardised_asset_list["is_pitched"] | self.standardised_asset_list["is_pitched"]
)
& (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].isin(["below average", "none"])
)
) )
& (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
].isin(["below average", "none"])
)
)
self.standardised_asset_list["epc_has_floor_recommendation"] = ( self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False) self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
@ -2252,16 +2261,15 @@ class AssetList:
# Check if the boiler is electric # Check if the boiler is electric
# We check if it contains both the terms boiler & electric # We check if it contains both the terms boiler & electric
self.standardised_asset_list["has_electric_boiler"] = ( self.standardised_asset_list["has_electric_boiler"] = (
self.standardised_asset_list[ self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"] self.EPC_API_DATA_NAMES["mainheat-description"]
] ]
.str.lower() .str.lower()
.isin(["boiler and radiators, electric"]) .isin(["boiler and radiators, electric"])
) | ( ) | (
self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
self.STANDARD_HEATING_SYSTEM] == "electric boiler"
== "electric boiler" )
)
#################################### ####################################
# Check solar eligibility # Check solar eligibility
@ -2399,11 +2407,11 @@ class AssetList:
empty_cavity_map = { empty_cavity_map = {
"non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE "non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE
+ ": ", + ": ",
"non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property " "non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property "
"already has solar: ", "already has solar: ",
"non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, " "non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, "
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ", f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
} }
for variable, description in empty_cavity_map.items(): for variable, description in empty_cavity_map.items():
self.standardised_asset_list["cavity_reason"] = np.where( self.standardised_asset_list["cavity_reason"] = np.where(
@ -2419,8 +2427,8 @@ class AssetList:
( (
self.standardised_asset_list["epc_indicates_empty_cavity"] self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity" "non_intrusive_indicates_empty_cavity"
] ]
& ( & (
self.standardised_asset_list["non-intrusives: WFT Findings"] self.standardised_asset_list["non-intrusives: WFT Findings"]
.str.lower() .str.lower()
@ -2445,8 +2453,8 @@ class AssetList:
( (
self.standardised_asset_list["epc_indicates_empty_cavity"] self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity" "non_intrusive_indicates_empty_cavity"
] ]
& self.standardised_asset_list[ & self.standardised_asset_list[
"non_intrusive_indicates_cavity_extraction" "non_intrusive_indicates_cavity_extraction"
] ]
@ -2461,8 +2469,8 @@ class AssetList:
( (
self.standardised_asset_list["epc_indicates_empty_cavity"] self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity" "non_intrusive_indicates_empty_cavity"
] ]
& ( & (
self.standardised_asset_list["non-intrusives: Insulated"] self.standardised_asset_list["non-intrusives: Insulated"]
== "RETRO DRILLED" == "RETRO DRILLED"
@ -2478,8 +2486,8 @@ class AssetList:
( (
self.standardised_asset_list["epc_indicates_empty_cavity"] self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity" "non_intrusive_indicates_empty_cavity"
] ]
& ( & (
self.standardised_asset_list["non-intrusives: Insulated"] self.standardised_asset_list["non-intrusives: Insulated"]
== "FILLED AT BUILD" == "FILLED AT BUILD"
@ -2495,8 +2503,8 @@ class AssetList:
( (
self.standardised_asset_list["epc_indicates_empty_cavity"] self.standardised_asset_list["epc_indicates_empty_cavity"]
& ~self.standardised_asset_list[ & ~self.standardised_asset_list[
"non_intrusive_indicates_empty_cavity" "non_intrusive_indicates_empty_cavity"
] ]
& pd.isnull(self.standardised_asset_list["cavity_reason"]) & pd.isnull(self.standardised_asset_list["cavity_reason"])
), ),
f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"], f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"],
@ -2640,7 +2648,7 @@ class AssetList:
identified_work = self.standardised_asset_list[ identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"]) ~pd.isnull(self.standardised_asset_list["cavity_reason"])
| ~pd.isnull(self.standardised_asset_list["solar_reason"]) | ~pd.isnull(self.standardised_asset_list["solar_reason"])
][self.DOMNA_PROPERTY_ID].values ][self.DOMNA_PROPERTY_ID].values
if self.DOMNA_PROPERTY_ID in self.outcomes.columns: if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
self.outcomes_for_output = self.outcomes[ self.outcomes_for_output = self.outcomes[
@ -2675,12 +2683,12 @@ class AssetList:
blocks_of_flats = self.standardised_asset_list[ blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats" == "block of flats"
] ]
non_blocks_of_flats = self.standardised_asset_list[ non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats" != "block of flats"
] ]
# Produce some aggregate figures # Produce some aggregate figures
self.work_type_figures = { self.work_type_figures = {
@ -2723,7 +2731,7 @@ class AssetList:
blocks = self.standardised_asset_list[ blocks = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
== "block of flats" == "block of flats"
].copy() ].copy()
if blocks.empty: if blocks.empty:
return return
@ -2860,7 +2868,7 @@ class AssetList:
self.standardised_asset_list = self.standardised_asset_list[ self.standardised_asset_list = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
!= "block of flats" != "block of flats"
] ]
self.standardised_asset_list = pd.concat( self.standardised_asset_list = pd.concat(
[self.standardised_asset_list, expanded_blocks], ignore_index=True [self.standardised_asset_list, expanded_blocks], ignore_index=True
@ -2940,7 +2948,7 @@ class AssetList:
# find any block refs with more than 50% emptires # find any block refs with more than 50% emptires
viable_empty_blocks = self.block_analysis_df[ viable_empty_blocks = self.block_analysis_df[
self.block_analysis_df["Percentage of Empties"] >= 0.50 self.block_analysis_df["Percentage of Empties"] >= 0.50
] ]
if not viable_empty_blocks.empty: if not viable_empty_blocks.empty:
project_code_lookup = viable_empty_blocks[["Block Reference"]].copy() project_code_lookup = viable_empty_blocks[["Block Reference"]].copy()
@ -3179,7 +3187,7 @@ class AssetList:
contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[ contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[
[self.contact_detail_fields["landlord_property_id"]] + details_colnames [self.contact_detail_fields["landlord_property_id"]] + details_colnames
] ]
contact_details = contact_details[ contact_details = contact_details[
~pd.isnull( ~pd.isnull(
contact_details[self.contact_detail_fields["landlord_property_id"]] contact_details[self.contact_detail_fields["landlord_property_id"]]
@ -3572,13 +3580,10 @@ class AssetList:
"Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>": date_of_inspections, "Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>": date_of_inspections,
"Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>": non_intrusives_construction, "Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>": non_intrusives_construction,
"Non-intrusives: Insulation <LISTING non_intrusives__insulation>": non_intrusives_insulated, "Non-intrusives: Insulation <LISTING non_intrusives__insulation>": non_intrusives_insulated,
"Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>": "Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>": non_intrusives_insulation_material,
non_intrusives_insulation_material, "Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>": non_intrusives_ciga_check_required,
"Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>":
non_intrusives_ciga_check_required,
"Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>": non_intrusives_pv_access, "Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>": non_intrusives_pv_access,
"Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>": "Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>": non_intrusives_roof_orientation,
non_intrusives_roof_orientation,
"Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>": non_intrusives_surveyor_notes, "Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>": non_intrusives_surveyor_notes,
"Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>": non_intrusives_surveyor_name, "Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>": non_intrusives_surveyor_name,
"CIGA: Date Requested <LISTING ciga__date_requested>": None, # TODO: Don't have this for the moment "CIGA: Date Requested <LISTING ciga__date_requested>": None, # TODO: Don't have this for the moment
@ -3755,8 +3760,8 @@ class AssetList:
# We compare address line 1 to full address # We compare address line 1 to full address
if any( if any(
df[self.STANDARD_FULL_ADDRESS] df[self.STANDARD_FULL_ADDRESS]
.str.lower() .str.lower()
.str.contains(row["Address Line 1"].lower(), na=False) .str.contains(row["Address Line 1"].lower(), na=False)
): ):
df = df[ df = df[
df[self.STANDARD_FULL_ADDRESS] df[self.STANDARD_FULL_ADDRESS]
@ -3996,7 +4001,7 @@ class AssetList:
matched = matched[ matched = matched[
matched["houseno"].astype(str) == house_no_to_match matched["houseno"].astype(str) == house_no_to_match
] ]
if matched.shape[0] == 1: if matched.shape[0] == 1:
lookup_i.append( lookup_i.append(
{ {
@ -4021,7 +4026,7 @@ class AssetList:
)[0] )[0]
matched = matched[ matched = matched[
matched[self.STANDARD_FULL_ADDRESS] == best_match matched[self.STANDARD_FULL_ADDRESS] == best_match
] ]
lookup_i.append( lookup_i.append(
{ {
"row_id": x["row_id"], "row_id": x["row_id"],
@ -4332,7 +4337,7 @@ class AssetList:
df = self.standardised_asset_list[ df = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID]
== row[master_id_colnames[idx]] == row[master_id_colnames[idx]]
] ]
if df.shape[0] == 1: if df.shape[0] == 1:
matched.append( matched.append(
{ {
@ -4438,7 +4443,7 @@ class AssetList:
)[1] )[1]
) )
> 90 > 90
] ]
if df.shape[0] == 0: if df.shape[0] == 0:
unmatched.append(row["row_id"]) unmatched.append(row["row_id"])
@ -4446,8 +4451,8 @@ class AssetList:
if any( if any(
df[self.STANDARD_FULL_ADDRESS] df[self.STANDARD_FULL_ADDRESS]
.str.lower() .str.lower()
.str.contains( .str.contains(
" ".join( " ".join(
[row[house_no_col], row["Street / Block Name"]] [row[house_no_col], row["Street / Block Name"]]
).lower() ).lower()
@ -4474,7 +4479,7 @@ class AssetList:
row[property_type_col].split(" ")[-1].lower() row[property_type_col].split(" ")[-1].lower()
) )
& (df[self.STANDARD_PROPERTY_TYPE] != "block of flats") & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
] ]
if df.shape[0] != 1: if df.shape[0] != 1:
# We have multiple matches - it's likely because the landlord has a duplicate # We have multiple matches - it's likely because the landlord has a duplicate

View file

@ -21,6 +21,11 @@ EPC_AUTH_TOKEN = os.getenv(
OPENAI_API_KEY = os.getenv( OPENAI_API_KEY = os.getenv(
"OPENAI_API_KEY", "OPENAI_API_KEY",
) )
print(
f"[debug] OPENAI_API_KEY loaded: "
f"{OPENAI_API_KEY[:8]}...{OPENAI_API_KEY[-4:] if OPENAI_API_KEY else 'NONE'} "
f"(len={len(OPENAI_API_KEY) if OPENAI_API_KEY else 0})"
)
def extract_address1( def extract_address1(
@ -74,23 +79,23 @@ def app():
""" """
data_folder = "/workspaces/model/asset_list" data_folder = "/workspaces/model/asset_list"
data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx" data_filename = "input.xlsx"
sheet_name = "in" sheet_name = "Handovers"
postcode_column = "postcode_clean" postcode_column = "POSTCODE"
address1_column = "address2uprn_address" address1_column = "Full Addres"
address1_method = None address1_method = None
fulladdress_column = "address2uprn_address" fulladdress_column = "Full Addres"
address_cols_to_concat = [] address_cols_to_concat = []
missing_postcodes_method = None missing_postcodes_method = None
landlord_year_built = None landlord_year_built = None
landlord_os_uprn = "address2uprn_uprn" landlord_os_uprn = "domna_found_uprn"
landlord_property_type = "Property Type" # Good to include if landlord gave landlord_property_type = "PROPERTY TYPE" # Good to include if landlord gave
landlord_built_form = "Built Form" # Good to include if landlord gave landlord_built_form = "Type Description" # Good to include if landlord gave
landlord_wall_construction = None landlord_wall_construction = None
landlord_roof_construction = None landlord_roof_construction = None
landlord_heating_system = None landlord_heating_system = None
landlord_existing_pv = None landlord_existing_pv = None
landlord_property_id = "UPRN" landlord_property_id = "PROP REF"
landlord_sap = None landlord_sap = None
outcomes_filename = None outcomes_filename = None
outcomes_sheetname = None outcomes_sheetname = None
@ -131,6 +136,7 @@ def app():
landlord_sap=landlord_sap, landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference, landlord_block_reference=landlord_block_reference,
phase=phase, phase=phase,
openai_api_key=OPENAI_API_KEY,
) )
asset_list.init_standardise() asset_list.init_standardise()
@ -462,3 +468,9 @@ def app():
asset_list.duplicated_addresses.to_excel( asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False writer, sheet_name="Duplicate Properties", index=False
) )
for key,value in dict.items():
lsakjfldsa

View file

@ -17,16 +17,12 @@ from utils.s3 import (
from datetime import datetime from datetime import datetime
from backend.utils.addressMatch import AddressMatch from backend.utils.addressMatch import AddressMatch
from backend.address2UPRN.scoring import ( # noqa: F401 (re-exported)
logger = setup_logger() df_has_single_uprn,
get_uprn_candidates,
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
) )
if EPC_AUTH_TOKEN is None: logger = setup_logger()
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
def score_addresses( def score_addresses(
@ -45,7 +41,10 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
Recursively fetch EPC data by postcode. Recursively fetch EPC data by postcode.
If results hit the size limit, retry with double size up to max_attempts. If results hit the size limit, retry with double size up to max_attempts.
""" """
client = EpcClient(auth_token=EPC_AUTH_TOKEN) auth_token = os.getenv("EPC_AUTH_TOKEN")
if auth_token is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
client = EpcClient(auth_token=auth_token)
url = os.path.join(client.domestic.host, "search") url = os.path.join(client.domestic.host, "search")
@ -88,65 +87,6 @@ def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
return results_df return results_df
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
# Drop nulls and normalise to string
uprns = df[column].dropna().astype(str).str.strip().unique()
# No valid UPRNs to compare
if len(uprns) == 0:
return False
# Exactly one unique UPRN and it matches
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
# Normalise UPRN to string
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
# Rank: 1 = best match
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)
def get_uprn_with_epc_df( def get_uprn_with_epc_df(
user_inputed_address: str, user_inputed_address: str,
epc_df: pd.DataFrame, epc_df: pd.DataFrame,

View file

@ -0,0 +1,57 @@
import pandas as pd
from backend.utils.addressMatch import AddressMatch
def df_has_single_uprn(df: pd.DataFrame, uprn: str, column: str = "uprn") -> bool:
"""
Returns True if all non-null UPRNs in df match the given uprn.
Returns False otherwise.
"""
if column not in df.columns:
return False
uprns = df[column].dropna().astype(str).str.strip().unique()
if len(uprns) == 0:
return False
return len(uprns) == 1 and uprns[0] == str(uprn)
def get_uprn_candidates(
df: pd.DataFrame,
user_address: str,
address_column: str = "address",
uprn_column: str = "uprn",
) -> pd.DataFrame:
"""
Annotate EPC results with lexicographical similarity scores and ranks.
Returns a DataFrame sorted by descending lexiscore.
DOES NOT choose or return a UPRN.
"""
if address_column not in df.columns:
raise ValueError(f"Missing column: {address_column}")
if uprn_column not in df.columns:
raise ValueError(f"Missing column: {uprn_column}")
out = df.copy()
user_norm = AddressMatch.normalise_address(user_address)
out["lexiscore"] = out[address_column].apply(
lambda x: AddressMatch.levenshtein(user_norm, x)
)
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
out["lexirank"] = out["lexiscore"].rank(method="dense", ascending=False).astype(int)
return out.sort_values(
["lexirank", "lexiscore"],
ascending=[True, False],
)

View file

@ -80,6 +80,7 @@ class Settings(BaseSettings):
OSMOSIS_ACD_SHAREPOINT_ID: Optional[str] = None OSMOSIS_ACD_SHAREPOINT_ID: Optional[str] = None
PRIVATE_PAY_SHAREPOINT_ID: Optional[str] = None PRIVATE_PAY_SHAREPOINT_ID: Optional[str] = None
SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID: Optional[str] = None SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID: Optional[str] = None
OPENAI_API_KEY: Optional[str] = None
# Pas Hub # Pas Hub
PASHUB_EMAIL: Optional[str] = None PASHUB_EMAIL: Optional[str] = None

View file

@ -2,8 +2,8 @@ from fastapi import APIRouter, HTTPException, status
from jose import jwt, jwe from jose import jwt, jwe
import json import json
import datetime import datetime
from app.config import get_settings from backend.app.config import get_settings
from app.dependencies import get_derived_encryption_key from backend.app.dependencies import get_derived_encryption_key
router = APIRouter( router = APIRouter(
prefix="/local", prefix="/local",
@ -27,7 +27,12 @@ def create_dummy_token(secret: str) -> str:
"dbId": "known_id", "dbId": "known_id",
} }
token = jwe.encrypt(json.dumps(claims), get_derived_encryption_key(secret), algorithm="dir", encryption="A256GCM") token = jwe.encrypt(
json.dumps(claims),
get_derived_encryption_key(secret),
algorithm="dir",
encryption="A256GCM",
)
return token return token
@ -40,6 +45,8 @@ async def dummy_token():
async def dummy_token(): async def dummy_token():
settings = get_settings() settings = get_settings()
if settings.ENVIRONMENT != "local": if settings.ENVIRONMENT != "local":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, raise HTTPException(
detail="Dummy token can only be generated in local environment") status_code=status.HTTP_403_FORBIDDEN,
detail="Dummy token can only be generated in local environment",
)
return {"dummy_token": create_dummy_token(settings.SECRET_KEY)} return {"dummy_token": create_dummy_token(settings.SECRET_KEY)}

View file

@ -30,10 +30,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}") logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse( return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({ content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
"detail": exc.errors(),
"body": exc.body
}),
) )
@ -63,7 +60,8 @@ app.include_router(tasks_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1") app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local": if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router from backend.app.local import router as local_router
app.include_router(local_router.router) app.include_router(local_router.router)
handler = Mangum(app) handler = Mangum(app)
@ -98,10 +96,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
logger.error(f"Validation Errors: {exc.errors()}") logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse( return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({ content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
"detail": exc.errors(),
"body": exc.body
}),
) )
@ -130,7 +125,8 @@ app.include_router(whlg_router.router, prefix="/v1")
app.include_router(bulk_uploads_router.router, prefix="/v1") app.include_router(bulk_uploads_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local": if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router from backend.app.local import router as local_router
app.include_router(local_router.router) app.include_router(local_router.router)
handler = Mangum(app) handler = Mangum(app)

0
backend/etl/__init__.py Normal file
View file

View file

@ -0,0 +1,14 @@
This website https://epc.opendatacommunities.org/ has closed down on 30th May 2026
So we downloaded the data and moved everything to S3 ( s3://retrofit-data-dev/histroical_epc/0_master_backup/ )
This scripts assumes the following:
1) You downloaded the master copy, uncompressed it and set it to a path so we can read the csv
The script funciton is:
1) reads csv for all data, seperate each iteration by postcode
2) compresses the csv and save it in the location
3) location s3://retrofit-data-dev/historical_epc/<postcode>/compressed data.csv

View file

@ -0,0 +1,133 @@
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from io import BytesIO
from pathlib import Path
from typing import Any
import boto3
import pandas as pd
from botocore.config import Config
from tqdm import tqdm
from utils.logger import setup_logger
logger = setup_logger()
SRC_ROOT = Path("/workspaces/home/epc_data")
TMP_ROOT = Path("/tmp/epc_postcodes")
S3_BUCKET = "retrofit-data-dev"
S3_PREFIX = "historical_epc"
# This scripts assume you downloading the zip, unzip it, and running it locally
def sanitise(pc: pd.Series) -> pd.Series:
return pc.astype("string").str.upper().str.replace(" ", "", regex=False)
def shard_la(la_dir: Path) -> None:
certs = pd.read_csv(la_dir / "certificates.csv", low_memory=False)
certs["POSTCODE_CLEAN"] = sanitise(certs["POSTCODE"])
before = len(certs)
certs = certs.dropna(subset=["POSTCODE_CLEAN"])
certs = certs[certs["POSTCODE_CLEAN"] != ""]
dropped = before - len(certs)
if dropped:
logger.warning(f"{la_dir.name}: dropped {dropped} rows with empty postcode")
for pc, group in certs.groupby("POSTCODE_CLEAN", sort=False):
out = TMP_ROOT / f"{pc}.csv"
group.drop(columns=["POSTCODE_CLEAN"]).to_csv(
out, mode="a", header=not out.exists(), index=False
)
def list_existing_keys(s3: Any) -> set[str]:
existing: set[str] = set()
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=f"{S3_PREFIX}/")
for page in tqdm(pages, desc="list s3"):
for obj in page.get("Contents", []):
existing.add(obj["Key"])
logger.info(f"Found {len(existing)} existing objects under {S3_PREFIX}/")
return existing
def upload_postcode(path: Path, s3: Any) -> None:
df = pd.read_csv(path, low_memory=False).drop_duplicates()
dupes = df["LMK_KEY"].value_counts()
bad = dupes[dupes > 1]
if not bad.empty:
raise ValueError(
f"Postcode {path.stem}: LMK_KEY appears with conflicting cert data: "
f"{bad.index.tolist()[:5]}"
)
buf = BytesIO()
df.to_csv(buf, index=False, compression="gzip")
s3.put_object(
Bucket=S3_BUCKET,
Key=f"{S3_PREFIX}/{path.stem}/data.csv.gz",
Body=buf.getvalue(),
ContentType="text/csv",
ContentEncoding="gzip",
)
def main():
TMP_ROOT.mkdir(parents=True, exist_ok=True)
la_dirs = sorted(
p for p in SRC_ROOT.iterdir() if p.is_dir() and p.name.startswith("domestic-")
)
logger.info(f"Sharding {len(la_dirs)} LA folders -> {TMP_ROOT}")
for la in tqdm(la_dirs, desc="shard"):
shard_la(la)
s3 = boto3.client(
"s3",
config=Config(
max_pool_connections=512, retries={"max_attempts": 5, "mode": "standard"}
),
)
pc_files = sorted(TMP_ROOT.glob("*.csv"))
logger.info(f"Found {len(pc_files)} local shards")
existing = list_existing_keys(s3)
todo = [p for p in pc_files if f"{S3_PREFIX}/{p.stem}/data.csv.gz" not in existing]
skipped = len(pc_files) - len(todo)
logger.info(
f"Uploading {len(todo)} shards (skipping {skipped} already in S3) -> "
f"s3://{S3_BUCKET}/{S3_PREFIX}/"
)
workers = 256
todo_iter = iter(todo)
inflight: dict[Any, Path] = {}
pbar = tqdm(total=len(todo), desc="upload")
with ThreadPoolExecutor(max_workers=workers) as pool:
for _ in range(workers * 2):
pc = next(todo_iter, None)
if pc is None:
break
inflight[pool.submit(upload_postcode, pc, s3)] = pc
while inflight:
done, _ = wait(inflight.keys(), return_when=FIRST_COMPLETED)
for fut in done:
pc = inflight.pop(fut)
try:
fut.result()
except Exception as e:
logger.error(f"{pc.name}: {e}")
raise
pbar.update(1)
nxt = next(todo_iter, None)
if nxt is not None:
inflight[pool.submit(upload_postcode, nxt, s3)] = nxt
pbar.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,98 @@
from dataclasses import dataclass
@dataclass
class HistoricEpc:
lmk_key: str
address1: str
address2: str
address3: str
postcode: str
building_reference_number: str
current_energy_rating: str
potential_energy_rating: str
current_energy_efficiency: str
potential_energy_efficiency: str
property_type: str
built_form: str
inspection_date: str
local_authority: str
constituency: str
county: str
lodgement_date: str
transaction_type: str
environment_impact_current: str
environment_impact_potential: str
energy_consumption_current: str
energy_consumption_potential: str
co2_emissions_current: str
co2_emiss_curr_per_floor_area: str
co2_emissions_potential: str
lighting_cost_current: str
lighting_cost_potential: str
heating_cost_current: str
heating_cost_potential: str
hot_water_cost_current: str
hot_water_cost_potential: str
total_floor_area: str
energy_tariff: str
mains_gas_flag: str
floor_level: str
flat_top_storey: str
flat_storey_count: str
main_heating_controls: str
multi_glaze_proportion: str
glazed_type: str
glazed_area: str
extension_count: str
number_habitable_rooms: str
number_heated_rooms: str
low_energy_lighting: str
number_open_fireplaces: str
hotwater_description: str
hot_water_energy_eff: str
hot_water_env_eff: str
floor_description: str
floor_energy_eff: str
floor_env_eff: str
windows_description: str
windows_energy_eff: str
windows_env_eff: str
walls_description: str
walls_energy_eff: str
walls_env_eff: str
secondheat_description: str
sheating_energy_eff: str
sheating_env_eff: str
roof_description: str
roof_energy_eff: str
roof_env_eff: str
mainheat_description: str
mainheat_energy_eff: str
mainheat_env_eff: str
mainheatcont_description: str
mainheatc_energy_eff: str
mainheatc_env_eff: str
lighting_description: str
lighting_energy_eff: str
lighting_env_eff: str
main_fuel: str
wind_turbine_count: str
heat_loss_corridor: str
unheated_corridor_length: str
floor_height: str
photo_supply: str
solar_water_heating_flag: str
mechanical_ventilation: str
address: str
local_authority_label: str
constituency_label: str
posttown: str
construction_age_band: str
lodgement_datetime: str
tenure: str
fixed_lighting_outlets_count: str
low_energy_fixed_light_count: str
uprn: str
uprn_source: str
report_type: str

View file

@ -0,0 +1,104 @@
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from botocore.exceptions import ClientError
from backend.address2UPRN.scoring import get_uprn_candidates
from backend.utils.addressMatch import AddressMatch
from datatypes.epc.domain.historic_epc import HistoricEpc
from utils.pandas_utils import pandas_cell_to_str
from utils.s3 import parse_s3_uri, read_csv_gz_from_s3
DEFAULT_S3_ROOT = "s3://retrofit-data-dev/historical_epc"
_EXTRA_COLS = {"lexiscore", "lexirank"}
def _map_historic_epc_pandas_row_to_domain(row: pd.Series) -> HistoricEpc:
kwargs = {
col.lower(): pandas_cell_to_str(val)
for col, val in row.items()
if col.lower() not in _EXTRA_COLS
}
return HistoricEpc(**kwargs)
@dataclass(frozen=True)
class ScoredHistoricEpc:
record: HistoricEpc
lexiscore: float
lexirank: int
@dataclass
class HistoricEpcMatches:
user_address: str
postcode: str
matches: list[ScoredHistoricEpc]
def top(self) -> Optional[ScoredHistoricEpc]:
return self.matches[0] if self.matches else None
def top_n(self, k: int) -> list[ScoredHistoricEpc]:
return self.matches[:k]
def unambiguous_uprn(self) -> Optional[str]:
top = self.top()
if top is None or top.lexiscore <= 0:
return None
rank1 = [m for m in self.matches if m.lexirank == top.lexirank]
uprns = {m.record.uprn for m in rank1 if m.record.uprn}
return next(iter(uprns)) if len(uprns) == 1 else None
def _sanitise_postcode(postcode: str) -> str:
cleaned = (postcode or "").upper().replace(" ", "")
if not cleaned:
raise ValueError("postcode must contain non-whitespace characters")
if not AddressMatch.is_valid_postcode(cleaned):
raise ValueError(f"postcode {cleaned!r} is not a valid UK postcode")
return cleaned
def match_addresses_for_postcode(
user_address: str,
postcode: str,
*,
s3_root: str = DEFAULT_S3_ROOT,
address_column: str = "ADDRESS",
uprn_column: str = "UPRN",
) -> HistoricEpcMatches:
if not user_address:
raise ValueError("user_address must be non-empty")
pc = _sanitise_postcode(postcode)
bucket, root_prefix = parse_s3_uri(s3_root)
key = f"{root_prefix.rstrip('/')}/{pc}/data.csv.gz"
try:
df = read_csv_gz_from_s3(bucket, key)
except ClientError as e:
if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
raise FileNotFoundError(
f"No historic EPC data at s3://{bucket}/{key}"
) from e
raise
scored = get_uprn_candidates(
df,
user_address=user_address,
address_column=address_column,
uprn_column=uprn_column,
)
matches = [
ScoredHistoricEpc(
record=_map_historic_epc_pandas_row_to_domain(row),
lexiscore=float(row["lexiscore"]),
lexirank=int(row["lexirank"]),
)
for _, row in scored.iterrows()
]
return HistoricEpcMatches(user_address=user_address, postcode=pc, matches=matches)

View file

@ -0,0 +1,239 @@
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
from botocore.exceptions import ClientError
from datatypes.epc.domain import historic_epc_matching as matcher_mod
from datatypes.epc.domain.historic_epc_matching import (
HistoricEpcMatches,
ScoredHistoricEpc,
_sanitise_postcode,
match_addresses_for_postcode,
)
# Columns required by the HistoricEpc dataclass (lower-cased CSV columns).
# The matcher only reads ADDRESS + UPRN to score; everything else is filled
# with "" but must be present for HistoricEpc(**kwargs) to construct.
_FULL_COLUMN_FIELDS = [
"LMK_KEY", "ADDRESS1", "ADDRESS2", "ADDRESS3", "POSTCODE",
"BUILDING_REFERENCE_NUMBER", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING",
"CURRENT_ENERGY_EFFICIENCY", "POTENTIAL_ENERGY_EFFICIENCY", "PROPERTY_TYPE",
"BUILT_FORM", "INSPECTION_DATE", "LOCAL_AUTHORITY", "CONSTITUENCY", "COUNTY",
"LODGEMENT_DATE", "TRANSACTION_TYPE", "ENVIRONMENT_IMPACT_CURRENT",
"ENVIRONMENT_IMPACT_POTENTIAL", "ENERGY_CONSUMPTION_CURRENT",
"ENERGY_CONSUMPTION_POTENTIAL", "CO2_EMISSIONS_CURRENT",
"CO2_EMISS_CURR_PER_FLOOR_AREA", "CO2_EMISSIONS_POTENTIAL",
"LIGHTING_COST_CURRENT", "LIGHTING_COST_POTENTIAL", "HEATING_COST_CURRENT",
"HEATING_COST_POTENTIAL", "HOT_WATER_COST_CURRENT", "HOT_WATER_COST_POTENTIAL",
"TOTAL_FLOOR_AREA", "ENERGY_TARIFF", "MAINS_GAS_FLAG", "FLOOR_LEVEL",
"FLAT_TOP_STOREY", "FLAT_STOREY_COUNT", "MAIN_HEATING_CONTROLS",
"MULTI_GLAZE_PROPORTION", "GLAZED_TYPE", "GLAZED_AREA", "EXTENSION_COUNT",
"NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES", "HOTWATER_DESCRIPTION", "HOT_WATER_ENERGY_EFF",
"HOT_WATER_ENV_EFF", "FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF",
"WINDOWS_DESCRIPTION", "WINDOWS_ENERGY_EFF", "WINDOWS_ENV_EFF",
"WALLS_DESCRIPTION", "WALLS_ENERGY_EFF", "WALLS_ENV_EFF",
"SECONDHEAT_DESCRIPTION", "SHEATING_ENERGY_EFF", "SHEATING_ENV_EFF",
"ROOF_DESCRIPTION", "ROOF_ENERGY_EFF", "ROOF_ENV_EFF", "MAINHEAT_DESCRIPTION",
"MAINHEAT_ENERGY_EFF", "MAINHEAT_ENV_EFF", "MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF", "MAINHEATC_ENV_EFF", "LIGHTING_DESCRIPTION",
"LIGHTING_ENERGY_EFF", "LIGHTING_ENV_EFF", "MAIN_FUEL", "WIND_TURBINE_COUNT",
"HEAT_LOSS_CORRIDOR", "UNHEATED_CORRIDOR_LENGTH", "FLOOR_HEIGHT",
"PHOTO_SUPPLY", "SOLAR_WATER_HEATING_FLAG", "MECHANICAL_VENTILATION",
"ADDRESS", "LOCAL_AUTHORITY_LABEL", "CONSTITUENCY_LABEL", "POSTTOWN",
"CONSTRUCTION_AGE_BAND", "LODGEMENT_DATETIME", "TENURE",
"FIXED_LIGHTING_OUTLETS_COUNT", "LOW_ENERGY_FIXED_LIGHT_COUNT", "UPRN",
"UPRN_SOURCE", "REPORT_TYPE",
]
def _row(address: str, uprn) -> dict:
row = {col: "" for col in _FULL_COLUMN_FIELDS}
row["ADDRESS"] = address
row["UPRN"] = uprn
return row
def _build_df(rows: list[dict]) -> pd.DataFrame:
return pd.DataFrame(rows, columns=_FULL_COLUMN_FIELDS)
@pytest.fixture
def patch_postcode_valid():
with patch.object(matcher_mod.AddressMatch, "is_valid_postcode", return_value=True) as m:
yield m
@pytest.fixture
def patch_read():
with patch.object(matcher_mod, "read_csv_gz_from_s3") as m:
yield m
# ---------- _sanitise_postcode ----------
class TestSanitisePostcode:
def test_uppercases_and_strips_spaces(self, patch_postcode_valid):
assert _sanitise_postcode("ab33 8al") == "AB338AL"
def test_empty_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="non-whitespace"):
_sanitise_postcode("")
def test_whitespace_only_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="non-whitespace"):
_sanitise_postcode(" ")
def test_invalid_postcode_raises(self):
with patch.object(
matcher_mod.AddressMatch, "is_valid_postcode", return_value=False
):
with pytest.raises(ValueError, match="not a valid UK postcode"):
_sanitise_postcode("NONSENSE")
# ---------- match_addresses_for_postcode ----------
class TestMatchAddressesForPostcode:
def test_preserves_row_count_including_zero_score_rows(
self, patch_read, patch_postcode_valid
):
# Disjoint number sets => hard zero. Still kept in matches.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("999 SOMEWHERE ELSE", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert isinstance(result, HistoricEpcMatches)
assert len(result.matches) == 2
def test_top_has_lexirank_one_and_lexiscore_monotone(
self, patch_read, patch_postcode_valid
):
patch_read.return_value = _build_df([
_row("48 GORDON ROAD", "200"), # near miss
_row("47 GORDON ROAD", "100"), # exact (after normalisation)
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.top().lexirank == 1
scores = [m.lexiscore for m in result.matches]
assert scores == sorted(scores, reverse=True)
def test_s3_key_built_from_default_root(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
patch_read.assert_called_once_with(
"retrofit-data-dev", "historical_epc/AB338AL/data.csv.gz"
)
def test_s3_key_respects_custom_root_with_trailing_slash(
self, patch_read, patch_postcode_valid
):
patch_read.return_value = _build_df([_row("47 GORDON ROAD", "100")])
match_addresses_for_postcode(
"47 Gordon Road",
"AB33 8AL",
s3_root="s3://my-bucket/some/prefix/",
)
patch_read.assert_called_once_with(
"my-bucket", "some/prefix/AB338AL/data.csv.gz"
)
def test_no_such_key_translates_to_filenotfound(
self, patch_read, patch_postcode_valid
):
patch_read.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "missing"}}, "GetObject"
)
with pytest.raises(FileNotFoundError):
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
def test_other_client_error_propagates(self, patch_read, patch_postcode_valid):
patch_read.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "nope"}}, "GetObject"
)
with pytest.raises(ClientError):
match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
def test_empty_user_address_raises(self, patch_postcode_valid):
with pytest.raises(ValueError, match="user_address"):
match_addresses_for_postcode("", "AB33 8AL")
# ---------- unambiguous_uprn ----------
class TestUnambiguousUprn:
def test_exact_match_returns_uprn(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() == "100"
def test_ambiguous_tie_returns_none(self, patch_read, patch_postcode_valid):
# Two duplicate addresses with different UPRNs share rank-1.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("47 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert result.unambiguous_uprn() is None
def test_all_zero_score_returns_none_even_when_uprn_unique(
self, patch_read, patch_postcode_valid
):
# User address has building number 47; no row has 47 -> all hard-zero.
patch_read.return_value = _build_df([
_row("999 ELSEWHERE", "100"),
_row("888 ELSEWHERE", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
assert all(m.lexiscore == 0.0 for m in result.matches)
assert result.unambiguous_uprn() is None
def test_nan_uprn_becomes_empty_string_not_nan(
self, patch_read, patch_postcode_valid
):
# Use a real NaN in the UPRN cell.
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", np.nan),
_row("48 GORDON ROAD", "200"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top = result.top()
# pandas_cell_to_str must turn NaN/"nan" into "" (not the literal string "nan"),
# so unambiguous_uprn's truthiness check correctly drops the row.
assert top.record.uprn == ""
# ---------- top / top_n ----------
class TestTopHelpers:
def test_top_n_returns_first_k(self, patch_read, patch_postcode_valid):
patch_read.return_value = _build_df([
_row("47 GORDON ROAD", "100"),
_row("48 GORDON ROAD", "200"),
_row("49 GORDON ROAD", "300"),
])
result = match_addresses_for_postcode("47 Gordon Road", "AB33 8AL")
top2 = result.top_n(2)
assert len(top2) == 2
assert all(isinstance(m, ScoredHistoricEpc) for m in top2)
def test_top_on_empty_matches_returns_none(self):
empty = HistoricEpcMatches(user_address="x", postcode="AB338AL", matches=[])
assert empty.top() is None
assert empty.top_n(5) == []
assert empty.unambiguous_uprn() is None

View file

View file

@ -0,0 +1,18 @@
import csv
from datatypes.epc.domain.historic_epc import HistoricEpc
def _normalise(value: str | None) -> str:
if value is None:
return ""
return value.replace("\xa0", " ")
def read_historic_epc_csv(path: str) -> list[HistoricEpc]:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
return [
HistoricEpc(**{k.lower(): _normalise(v) for k, v in row.items()})
for row in reader
]

View file

@ -0,0 +1,49 @@
import os
import pytest
from datatypes.epc.loaders.historic_epc import read_historic_epc_csv
from datatypes.epc.domain.historic_epc import HistoricEpc
FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures")
class TestHistoricEpcLoading:
@pytest.fixture
def epc(self) -> HistoricEpc:
rows = read_historic_epc_csv(os.path.join(FIXTURES, "historic_epc.csv"))
return rows[0]
def test_returns_historic_epc_instance(self, epc: HistoricEpc) -> None:
assert isinstance(epc, HistoricEpc)
def test_lmk_key(self, epc: HistoricEpc) -> None:
assert epc.lmk_key == "9292c3bf26a8876ce59274401ea73e3de5bd0b3e52a507c2162a46e57db8ea2f"
def test_address1(self, epc: HistoricEpc) -> None:
assert epc.address1 == "47 GORDON ROAD"
def test_postcode(self, epc: HistoricEpc) -> None:
assert epc.postcode == "AB33 8AL"
def test_current_energy_rating(self, epc: HistoricEpc) -> None:
assert epc.current_energy_rating == "E"
def test_property_type(self, epc: HistoricEpc) -> None:
assert epc.property_type == "House"
def test_built_form(self, epc: HistoricEpc) -> None:
assert epc.built_form == "Semi-Detached"
def test_inspection_date(self, epc: HistoricEpc) -> None:
assert epc.inspection_date == "2021-04-11"
def test_uprn(self, epc: HistoricEpc) -> None:
assert epc.uprn == "151020766.0"
def test_uprn_source(self, epc: HistoricEpc) -> None:
assert epc.uprn_source == "Energy Assessor"
def test_report_type(self, epc: HistoricEpc) -> None:
assert epc.report_type == "100"

View file

@ -21,28 +21,28 @@ regional_labour_variations = [
{"Region": "Yorkshire and the Humber", "Adjustment_Factor": 0.86}, {"Region": "Yorkshire and the Humber", "Adjustment_Factor": 0.86},
{"Region": "Wales", "Adjustment_Factor": 0.88}, {"Region": "Wales", "Adjustment_Factor": 0.88},
{"Region": "Scotland", "Adjustment_Factor": 0.88}, {"Region": "Scotland", "Adjustment_Factor": 0.88},
{"Region": "Northern Ireland", "Adjustment_Factor": 0.76} {"Region": "Northern Ireland", "Adjustment_Factor": 0.76},
] ]
# Installers are now working with 435 watt panels # Installers are now working with 435 watt panels
PANEL_SIZE = 0.435 PANEL_SIZE = 0.435
INSTALLER_SOLAR_COSTS = [ INSTALLER_SOLAR_COSTS = [
{'n_panels': 4, 'array_kwp': 4 * PANEL_SIZE, 'cost': 4089.25, 'installer': 'CEG'}, {"n_panels": 4, "array_kwp": 4 * PANEL_SIZE, "cost": 4089.25, "installer": "CEG"},
{'n_panels': 5, 'array_kwp': 5 * PANEL_SIZE, 'cost': 4242.48, 'installer': 'CEG'}, {"n_panels": 5, "array_kwp": 5 * PANEL_SIZE, "cost": 4242.48, "installer": "CEG"},
{'n_panels': 6, 'array_kwp': 6 * PANEL_SIZE, 'cost': 4395.71, 'installer': 'CEG'}, {"n_panels": 6, "array_kwp": 6 * PANEL_SIZE, "cost": 4395.71, "installer": "CEG"},
{'n_panels': 7, 'array_kwp': 7 * PANEL_SIZE, 'cost': 4548.94, 'installer': 'CEG'}, {"n_panels": 7, "array_kwp": 7 * PANEL_SIZE, "cost": 4548.94, "installer": "CEG"},
{'n_panels': 8, 'array_kwp': 8 * PANEL_SIZE, 'cost': 4702.17, 'installer': 'CEG'}, {"n_panels": 8, "array_kwp": 8 * PANEL_SIZE, "cost": 4702.17, "installer": "CEG"},
{'n_panels': 9, 'array_kwp': 9 * PANEL_SIZE, 'cost': 4855.41, 'installer': 'CEG'}, {"n_panels": 9, "array_kwp": 9 * PANEL_SIZE, "cost": 4855.41, "installer": "CEG"},
{'n_panels': 10, 'array_kwp': 10 * PANEL_SIZE, 'cost': 5010.95, 'installer': 'CEG'}, {"n_panels": 10, "array_kwp": 10 * PANEL_SIZE, "cost": 5010.95, "installer": "CEG"},
{'n_panels': 11, 'array_kwp': 11 * PANEL_SIZE, 'cost': 5166.49, 'installer': 'CEG'}, {"n_panels": 11, "array_kwp": 11 * PANEL_SIZE, "cost": 5166.49, "installer": "CEG"},
{'n_panels': 12, 'array_kwp': 12 * PANEL_SIZE, 'cost': 5322.04, 'installer': 'CEG'}, {"n_panels": 12, "array_kwp": 12 * PANEL_SIZE, "cost": 5322.04, "installer": "CEG"},
{'n_panels': 13, 'array_kwp': 13 * PANEL_SIZE, 'cost': 5657.6, 'installer': 'CEG'}, {"n_panels": 13, "array_kwp": 13 * PANEL_SIZE, "cost": 5657.6, "installer": "CEG"},
{'n_panels': 14, 'array_kwp': 14 * PANEL_SIZE, 'cost': 5993.16, 'installer': 'CEG'}, {"n_panels": 14, "array_kwp": 14 * PANEL_SIZE, "cost": 5993.16, "installer": "CEG"},
{'n_panels': 15, 'array_kwp': 15 * PANEL_SIZE, 'cost': 6328.71, 'installer': 'CEG'}, {"n_panels": 15, "array_kwp": 15 * PANEL_SIZE, "cost": 6328.71, "installer": "CEG"},
{'n_panels': 16, 'array_kwp': 16 * PANEL_SIZE, 'cost': 6483.33, 'installer': 'CEG'}, {"n_panels": 16, "array_kwp": 16 * PANEL_SIZE, "cost": 6483.33, "installer": "CEG"},
{'n_panels': 17, 'array_kwp': 17 * PANEL_SIZE, 'cost': 6637.95, 'installer': 'CEG'}, {"n_panels": 17, "array_kwp": 17 * PANEL_SIZE, "cost": 6637.95, "installer": "CEG"},
{'n_panels': 18, 'array_kwp': 18 * PANEL_SIZE, 'cost': 6792.57, 'installer': 'CEG'} {"n_panels": 18, "array_kwp": 18 * PANEL_SIZE, "cost": 6792.57, "installer": "CEG"},
] ]
# These are costs we received from CRG, for pricing up air source heat pumps # These are costs we received from CRG, for pricing up air source heat pumps
@ -80,7 +80,12 @@ INSTALLER_SOLAR_PV_INVERTER_COST = 7500
INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs
INSTALLER_SOLAR_BATTERY_COSTS = [ INSTALLER_SOLAR_BATTERY_COSTS = [
{'capacity_kwh': 5, 'description': 'Battery Add on', 'cost': 3769.89, 'installer': 'JJC'}, {
"capacity_kwh": 5,
"description": "Battery Add on",
"cost": 3769.89,
"installer": "JJC",
},
# {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'}, # {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'},
# {'capacity_kwh': 5, 'description': 'Battery Retrofit existing system', 'cost': 4250.00, 'installer': 'CEG'}, # {'capacity_kwh': 5, 'description': 'Battery Retrofit existing system', 'cost': 4250.00, 'installer': 'CEG'},
# {'capacity_kwh': 10, 'description': 'Battery Retrofit Existing system', 'cost': 5950.00, 'installer': 'CEG'} # {'capacity_kwh': 10, 'description': 'Battery Retrofit Existing system', 'cost': 5950.00, 'installer': 'CEG'}
@ -102,10 +107,14 @@ TTZC_SMART_THERMOSTAT_LABOUR_HOURS = 2
TTZC_ELECTRICIAN_HOURLY_RATE = 45 TTZC_ELECTRICIAN_HOURLY_RATE = 45
# Based on cost of a Nest temperature sensor # Based on cost of a Nest temperature sensor
TTZC_ROOM_TEMPERATURE_SENSOR_COST = 50 TTZC_ROOM_TEMPERATURE_SENSOR_COST = 50
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = 0.17 # (Assume ~ 10 mins install per sensor) TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = (
0.17 # (Assume ~ 10 mins install per sensor)
)
# Basedon an average cost of smart radiator values # Basedon an average cost of smart radiator values
TTZC_SMART_RADIATOR_VALUES = 50 TTZC_SMART_RADIATOR_VALUES = 50
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = 0.37 # (Assume ~ 15-30 mins install per valve) TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = (
0.37 # (Assume ~ 15-30 mins install per valve)
)
# boiler prices based on # boiler prices based on
# This is the cost of a firs time central heating install from The Warm Front rate card # This is the cost of a firs time central heating install from The Warm Front rate card
@ -169,7 +178,7 @@ class Costs:
"heater_removal": 0.1, "heater_removal": 0.1,
"sealing_open_fireplace": 0.1, "sealing_open_fireplace": 0.1,
"mechanical_ventilation": 0.26, "mechanical_ventilation": 0.26,
"sloping_ceiling_insulation": 0.26 # Similar to IWI so using the same contingency "sloping_ceiling_insulation": 0.26, # Similar to IWI so using the same contingency
} }
# Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs # Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs
@ -195,36 +204,46 @@ class Costs:
:param property_instance: Instance of a Property class containing relevant details like wall area. :param property_instance: Instance of a Property class containing relevant details like wall area.
""" """
if not hasattr(property_instance, 'insulation_wall_area'): if not hasattr(property_instance, "insulation_wall_area"):
raise ValueError("Property instance must have an 'insulation_wall_area' attribute") raise ValueError(
"Property instance must have an 'insulation_wall_area' attribute"
)
self.property = property_instance self.property = property_instance
self.regional_labour_variations = regional_labour_variations self.regional_labour_variations = regional_labour_variations
self.region = county_to_region_map.get(self.property.epc_record.county, None) self.region = county_to_region_map.get(self.property.epc_record.county, None)
if self.region is None: if self.region is None:
# Try and grab using the local-authority-label # Try and grab using the local-authority-label
self.region = county_to_region_map.get(self.property.epc_record.local_authority_label, None) self.region = county_to_region_map.get(
self.property.epc_record.local_authority_label, None
)
if self.region is None: if self.region is None:
# Try and get the region after converting the keys to lower # Try and get the region after converting the keys to lower
self.region = { if self.property.epc_record.local_authority_label is not None:
k.lower(): v for k, v in county_to_region_map.items() self.region = {
}.get(self.property.epc_record.local_authority_label.lower(), None) k.lower(): v for k, v in county_to_region_map.items()
}.get(self.property.epc_record.local_authority_label.lower(), None)
if self.region is None: if self.region is None:
logger.warning("No region found for county %s, defaulting to South East England", logger.warning(
self.property.epc_record.county) "No region found for county %s, defaulting to South East England",
self.property.epc_record.county,
)
self.region = "South East England" self.region = "South East England"
self.labour_adjustment_factor = [ self.labour_adjustment_factor = [
x["Adjustment_Factor"] for x in self.regional_labour_variations if x["Adjustment_Factor"]
x["Region"] == self.region for x in self.regional_labour_variations
if x["Region"] == self.region
][0] ][0]
if not self.labour_adjustment_factor: if not self.labour_adjustment_factor:
raise ValueError("Labour adjustment factor not found") raise ValueError("Labour adjustment factor not found")
def cavity_wall_insulation(self, wall_area, material, is_extraction_and_refill=False): def cavity_wall_insulation(
self, wall_area, material, is_extraction_and_refill=False
):
""" """
Calculates the total cost for cavity wall insulation based on material and labor costs, Calculates the total cost for cavity wall insulation based on material and labor costs,
including contingency, preliminaries, profit, and VAT. including contingency, preliminaries, profit, and VAT.
@ -318,7 +337,8 @@ class Costs:
return { return {
"total": total_cost, "total": total_cost,
"contingency": self.CONTINGENCIES["suspended_floor_insulation"] * total_cost, "contingency": self.CONTINGENCIES["suspended_floor_insulation"]
* total_cost,
"contingency_rate": self.CONTINGENCIES["suspended_floor_insulation"], "contingency_rate": self.CONTINGENCIES["suspended_floor_insulation"],
"labour_hours": labour_hours, "labour_hours": labour_hours,
"labour_days": labour_days, "labour_days": labour_days,
@ -370,8 +390,7 @@ class Costs:
# - Apply sub-linear scaling for realism # - Apply sub-linear scaling for realism
# - Enforce a minimum duration so estimates are not unrealistically low # - Enforce a minimum duration so estimates are not unrealistically low
labour_days = max( labour_days = max(
min_days, min_days, base_days * (insulation_floor_area / base_area) ** labour_exponent
base_days * (insulation_floor_area / base_area) ** labour_exponent
) )
return labour_days return labour_days
@ -388,7 +407,9 @@ class Costs:
total_cost = material["total_cost"] * insulation_floor_area total_cost = material["total_cost"] * insulation_floor_area
daily_labour_rate = 300 # Based on checkatrade daily_labour_rate = 300 # Based on checkatrade
labour_days = self._estimate_number_of_days_for_solid_floor(insulation_floor_area) labour_days = self._estimate_number_of_days_for_solid_floor(
insulation_floor_area
)
labour_cost = labour_days * daily_labour_rate labour_cost = labour_days * daily_labour_rate
total_cost = total_cost + labour_cost total_cost = total_cost + labour_cost
@ -404,7 +425,6 @@ class Costs:
} }
def low_energy_lighting(self, number_of_lights, material): def low_energy_lighting(self, number_of_lights, material):
""" """
Calculates the total cost for low energy lighting based on material and labor costs, Calculates the total cost for low energy lighting based on material and labor costs,
including contingency, preliminaries, profit, and VAT. including contingency, preliminaries, profit, and VAT.
@ -419,7 +439,7 @@ class Costs:
total_cost = material["total_cost"] * number_of_lights total_cost = material["total_cost"] * number_of_lights
labour_hours = 1 labour_hours = 1
labour_days = (labour_hours / 8) labour_days = labour_hours / 8
return { return {
"total": total_cost, "total": total_cost,
@ -450,26 +470,22 @@ class Costs:
} }
@classmethod @classmethod
def solar_pv( def solar_pv(cls, solar_product, scaffolding_options, n_floors):
cls, """ """
solar_product,
scaffolding_options,
n_floors
):
"""
"""
system_cost = solar_product["total_cost"] system_cost = solar_product["total_cost"]
if not solar_product["includes_scaffolding"]: if not solar_product["includes_scaffolding"]:
# We base this on the number of floors # We base this on the number of floors
scaffolding = [x["total_cost"] for x in scaffolding_options if x["size"] == n_floors] scaffolding = [
x["total_cost"] for x in scaffolding_options if x["size"] == n_floors
]
if not scaffolding: if not scaffolding:
# If we have no options, handle this # If we have no options, handle this
if n_floors <= 3: if n_floors <= 3:
raise ValueError("No scaffolding options available for 3 or fewer floors") raise ValueError(
"No scaffolding options available for 3 or fewer floors"
)
# We take the largest scaffolding option available # We take the largest scaffolding option available
scaffolding_cost = max([x["total_cost"] for x in scaffolding_options]) scaffolding_cost = max([x["total_cost"] for x in scaffolding_options])
else: else:
@ -523,9 +539,9 @@ class Costs:
We base the estimates for the cost of electric room heaters on the cost per room as estimated by the We base the estimates for the cost of electric room heaters on the cost per room as estimated by the
following article: following article:
https://www.bestelectricradiators.co.uk/blog/cost-to-install-a-new-heating-system-uk/ https://www.bestelectricradiators.co.uk/blog/cost-to-install-a-new-heating-system-uk/
:param number_heated_rooms: int, number of rooms to be heated :param number_heated_rooms: int, number of rooms to be heated
:return: :return:
""" """
total_cost = 500 * number_heated_rooms total_cost = 500 * number_heated_rooms
@ -547,11 +563,11 @@ class Costs:
} }
def high_heat_electric_storage_heaters( def high_heat_electric_storage_heaters(
self, number_heated_rooms: int, self,
number_heated_rooms: int,
needs_cylinder: bool, needs_cylinder: bool,
product: dict | None = None product: dict | None = None,
): ):
""" """
We base the estimates for the cost of electric storage heaters on the cost per room as estimated by the We base the estimates for the cost of electric storage heaters on the cost per room as estimated by the
energy saving trust energy saving trust
@ -578,8 +594,11 @@ class Costs:
return { return {
"total": total_cost, "total": total_cost,
"contingency": total_cost * self.CONTINGENCIES["high_heat_retention_storage_heaters"], "contingency": total_cost
"contingency_rate": self.CONTINGENCIES["high_heat_retention_storage_heaters"], * self.CONTINGENCIES["high_heat_retention_storage_heaters"],
"contingency_rate": self.CONTINGENCIES[
"high_heat_retention_storage_heaters"
],
"subtotal": subtotal_before_vat, "subtotal": subtotal_before_vat,
"vat": vat, "vat": vat,
"labour_hours": labour_hours, "labour_hours": labour_hours,
@ -690,14 +709,14 @@ class Costs:
# The product costs are inclusive of VAT # The product costs are inclusive of VAT
product_costs = ( product_costs = (
TTZC_SMART_THERMOSTAT_COST + TTZC_SMART_THERMOSTAT_COST
TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms + + TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms
TTZC_SMART_RADIATOR_VALUES * number_heated_rooms + TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
) )
labour_hours = ( labour_hours = (
TTZC_SMART_THERMOSTAT_LABOUR_HOURS + TTZC_SMART_THERMOSTAT_LABOUR_HOURS
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms + + TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms + TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
) )
labour_costs = TTZC_ELECTRICIAN_HOURLY_RATE * labour_hours labour_costs = TTZC_ELECTRICIAN_HOURLY_RATE * labour_hours
# Add continency and preliminaries to the labour to account for the complexity of the job # Add continency and preliminaries to the labour to account for the complexity of the job
@ -722,7 +741,9 @@ class Costs:
"labour_days": labour_days, "labour_days": labour_days,
} }
def programmer_trvs_bypass(self, number_heated_rooms, has_programmer, has_trvs, has_bypass): def programmer_trvs_bypass(
self, number_heated_rooms, has_programmer, has_trvs, has_bypass
):
total_cost = 0 total_cost = 0
labour_hours = 0 labour_hours = 0
@ -779,7 +800,9 @@ class Costs:
} }
@staticmethod @staticmethod
def _estimate_n_radiators(number_habitable_rooms, total_floor_area, property_type, built_form): def _estimate_n_radiators(
number_habitable_rooms, total_floor_area, property_type, built_form
):
# Base number of radiators: one per habitable room # Base number of radiators: one per habitable room
base_radiators = number_habitable_rooms base_radiators = number_habitable_rooms
@ -787,34 +810,49 @@ class Costs:
additional_radiators = 3 # Initial assumption additional_radiators = 3 # Initial assumption
# Adjust additional radiators based on property type # Adjust additional radiators based on property type
if property_type == 'Flat': if property_type == "Flat":
additional_radiators -= 1 # Flats may need fewer radiators due to less exposure additional_radiators -= (
elif property_type in ['House', 'Bungalow', 'Maisonette']: 1 # Flats may need fewer radiators due to less exposure
)
elif property_type in ["House", "Bungalow", "Maisonette"]:
# Multiple floors in Maisonette may require additional heating points # Multiple floors in Maisonette may require additional heating points
additional_radiators += 2 # Houses and bungalows might need more due to greater exposure additional_radiators += (
2 # Houses and bungalows might need more due to greater exposure
)
else: else:
raise Exception("Invalid property type") raise Exception("Invalid property type")
# Adjust total radiator needs based on built form # Adjust total radiator needs based on built form
form_factor = { form_factor = {
'Enclosed Mid-Terrace': 0.9, "Enclosed Mid-Terrace": 0.9,
'Mid-Terrace': 0.95, "Mid-Terrace": 0.95,
'Enclosed End-Terrace': 0.95, "Enclosed End-Terrace": 0.95,
'Semi-Detached': 1.05, "Semi-Detached": 1.05,
'Detached': 1.25, "Detached": 1.25,
'End-Terrace': 1.05 "End-Terrace": 1.05,
} }
# Calculate total heating power needed and number of radiators based on standard output # Calculate total heating power needed and number of radiators based on standard output
total_heating_power_required = total_floor_area * 80 # Watts per square meter total_heating_power_required = total_floor_area * 80 # Watts per square meter
radiator_output = 1000 # Average wattage per radiator radiator_output = 1000 # Average wattage per radiator
total_radiators_based_on_power = (total_heating_power_required / radiator_output) * form_factor[built_form] total_radiators_based_on_power = (
total_heating_power_required / radiator_output
) * form_factor[built_form]
# Final estimation taking the higher of calculated needs or base room count # Final estimation taking the higher of calculated needs or base room count
estimated_radiators = max(total_radiators_based_on_power, base_radiators + additional_radiators) estimated_radiators = max(
total_radiators_based_on_power, base_radiators + additional_radiators
)
return round(estimated_radiators) return round(estimated_radiators)
def boiler(self, exising_room_heaters, system_change, n_heated_rooms, n_rooms, is_electric=False): def boiler(
self,
exising_room_heaters,
system_change,
n_heated_rooms,
n_rooms,
is_electric=False,
):
""" """
Based on a basic estimate of median value £2600 to install a low carbon combi boiler Based on a basic estimate of median value £2600 to install a low carbon combi boiler
First time central heating vosts can als be found here: First time central heating vosts can als be found here:
@ -859,12 +897,14 @@ class Costs:
number_habitable_rooms=n_rooms, number_habitable_rooms=n_rooms,
total_floor_area=self.property.floor_area, total_floor_area=self.property.floor_area,
property_type=self.property.epc_record.property_type, property_type=self.property.epc_record.property_type,
built_form=self.property.epc_record.built_form built_form=self.property.epc_record.built_form,
) )
additionals_labour_cost = labour_rate * self.labour_adjustment_factor additionals_labour_cost = labour_rate * self.labour_adjustment_factor
radiator_cost = DOUBLE_RADIATOR_COST * n_radiators radiator_cost = DOUBLE_RADIATOR_COST * n_radiators
system_change_cost = radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost system_change_cost = (
radiator_cost + FLUE_COST + PIPEWORK_COST + additionals_labour_cost
)
system_change_cost_before_vat = system_change_cost / (1 + self.VAT_RATE) system_change_cost_before_vat = system_change_cost / (1 + self.VAT_RATE)
system_change_vat = system_change_cost - system_change_cost_before_vat system_change_vat = system_change_cost - system_change_cost_before_vat
# We add an extra labour day for the system change # We add an extra labour day for the system change
@ -897,14 +937,18 @@ class Costs:
else: else:
return 250 return 250
def air_source_heat_pump(self, ashp_size: float, number_heated_rooms: int, total_floor_area: float) -> dict: def air_source_heat_pump(
self, ashp_size: float, number_heated_rooms: int, total_floor_area: float
) -> dict:
""" """
We produce a cost estimation for an air source heat pump, based on costs we have received from installers. We produce a cost estimation for an air source heat pump, based on costs we have received from installers.
""" """
system_cost = ( system_cost = (
(ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST) + ASHP_SECURITY + ASHP_WALL_BRACKET (ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST)
+ ASHP_SECURITY
+ ASHP_WALL_BRACKET
) )
available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS] available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS]
@ -940,7 +984,9 @@ class Costs:
} }
@staticmethod @staticmethod
def _estimate_number_of_days_for_sloping_ceiling(insulation_roof_area: float) -> float: def _estimate_number_of_days_for_sloping_ceiling(
insulation_roof_area: float,
) -> float:
""" """
Estimate labour days required to insulate an existing sloping ceiling. Estimate labour days required to insulate an existing sloping ceiling.
@ -965,14 +1011,15 @@ class Costs:
min_days = 2 min_days = 2
labour_days = max( labour_days = max(
min_days, min_days, base_days * (insulation_roof_area / base_area) ** labour_exponent
base_days * (insulation_roof_area / base_area) ** labour_exponent
) )
return labour_days return labour_days
@classmethod @classmethod
def sloping_ceiling_insulation(cls, insulation_roof_area: float) -> Mapping[str, float]: def sloping_ceiling_insulation(
cls, insulation_roof_area: float
) -> Mapping[str, float]:
""" """
This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes. This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes.
:param insulation_roof_area: Area of the sloping ceiling to be insulated :param insulation_roof_area: Area of the sloping ceiling to be insulated
@ -985,14 +1032,20 @@ class Costs:
# https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/ # https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/
# https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/ # https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/
# These assumptions last updated 21/02/2026 # These assumptions last updated 21/02/2026
insulation_cost_per_m2 = 52 # The actual install process is quite similar to IWI insulation_cost_per_m2 = (
52 # The actual install process is quite similar to IWI
)
labour_rate = 250 # per day labour_rate = 250 # per day
contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"] contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"]
labour_days = cls._estimate_number_of_days_for_sloping_ceiling(insulation_roof_area) labour_days = cls._estimate_number_of_days_for_sloping_ceiling(
insulation_roof_area
)
labour_hours = labour_days * 8 labour_hours = labour_days * 8
total = (insulation_cost_per_m2 * insulation_roof_area) + (labour_rate * labour_days) total = (insulation_cost_per_m2 * insulation_roof_area) + (
labour_rate * labour_days
)
# Assume VAT included in the total => total is 120% of subtotal # Assume VAT included in the total => total is 120% of subtotal
vat = total - (total / 1.2) vat = total - (total / 1.2)

View file

@ -0,0 +1,47 @@
"""Demo: look up historic EPC records for an address + postcode.
Reads the gzipped CSV at
s3://retrofit-data-dev/historical_epc/<POSTCODE>/data.csv.gz
scores rows against the user-provided address, and prints the top matches.
Usage:
python -m scripts.historic_epc_demo "47 Gordon Road" "AB33 8AL"
python -m scripts.historic_epc_demo # uses defaults below
"""
import sys
from datatypes.epc.domain.historic_epc_matching import match_addresses_for_postcode
def main(user_address: str, postcode: str) -> None:
print(f"Looking up: {user_address!r} @ {postcode!r}\n")
result = match_addresses_for_postcode(user_address, postcode)
print(f"Found {len(result.matches)} candidate row(s).\n")
print("Top 3 matches:")
for m in result.top_n(3):
print(
f" rank={m.lexirank} score={m.lexiscore:.3f} "
f"uprn={m.record.uprn or '(none)':<14} {m.record.address}"
)
print()
uprn = result.unambiguous_uprn()
if uprn:
print(f"Unambiguous UPRN: {uprn}")
else:
print("No unambiguous UPRN (zero-score, tie, or empty result).")
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) == 2:
main(args[0], args[1])
elif len(args) == 0:
main("47 Gordon Road", "AB33 8AL")
else:
print(__doc__)
sys.exit(2)

View file

@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict from collections import defaultdict
from sqlalchemy import func from sqlalchemy import func
PORTFOLIO_ID = 711 PORTFOLIO_ID = 632
SCENARIOS = [1233] SCENARIOS = [1144]
scenario_names = { scenario_names = {
1233: "Reach EPC C", 1144: "EPC C",
} }
project_name = "Novus" project_name = "Calico Refresh"
def get_data(portfolio_id, scenario_ids): def get_data(portfolio_id, scenario_ids):

14
utils/pandas_utils.py Normal file
View file

@ -0,0 +1,14 @@
from typing import Any
import pandas as pd
def pandas_cell_to_str(v: Any) -> str:
if v is None or (isinstance(v, float) and pd.isna(v)):
return ""
s = str(v).replace("\xa0", " ")
# get_uprn_candidates runs .astype(str) on UPRN, turning NaN into "nan".
# Treat that as missing so unambiguous_uprn truthiness checks work.
if s.lower() == "nan":
return ""
return s

View file

@ -6,8 +6,6 @@ from io import BytesIO, StringIO
from urllib.parse import unquote from urllib.parse import unquote
from utils.logger import setup_logger from utils.logger import setup_logger
from botocore.exceptions import NoCredentialsError, PartialCredentialsError from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from typing import Any
logger = setup_logger() logger = setup_logger()
@ -167,6 +165,21 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key):
return df return df
def read_csv_gz_from_s3(bucket_name: str, file_key: str) -> pd.DataFrame:
"""
Read a gzipped CSV from S3 into a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (must end in .csv.gz).
:return: A pandas DataFrame.
"""
if not file_key.endswith(".csv.gz"):
raise ValueError("file_key must end with .csv.gz")
buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key)
return pd.read_csv(buffer, compression="gzip", low_memory=False)
def save_csv_to_s3(dataframe, bucket_name, file_name): def save_csv_to_s3(dataframe, bucket_name, file_name):
""" """
Save a Pandas DataFrame to a CSV file in an S3 bucket. Save a Pandas DataFrame to a CSV file in an S3 bucket.