This commit is contained in:
Khalim Conn-Kowlessar 2025-02-10 15:41:33 +00:00
parent 7885467fa4
commit 77844c625e
7 changed files with 324 additions and 207 deletions

View file

@ -0,0 +1,61 @@
import os
import pandas as pd
from dotenv import load_dotenv
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.route_march_data_pull.app import get_data
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
addresses = [
{"address": "3 Willis Road", "postcode": "CB1 2AQ"},
{"address": "22 Catharine Street", "postcode": "CB1 3AW"},
{"address": "332 Mill Road", "postcode": "CB1 3NN"},
{"address": "330 Mill Road", "postcode": "CB1 3NN"},
{"address": "328 Mill Road", "postcode": "CB1 3NN"},
{"address": "71 Mill Road", "postcode": "CB1 2AS"},
{"address": "78 Argyle Street", "postcode": "CB1 3LZ"},
{"address": "9 Graham Road", "postcode": "CB4 2ZE"},
{"address": "217 Mill Road", "postcode": "CB1 3BE"},
{"address": "374 Mill Road", "postcode": "CB1 3NN"},
{"address": "174 Thoday Street", "postcode": "CB1 3AX"},
{"address": "37 Abbey Road", "postcode": "CB5 8HH"},
{"address": "18 Upper Gwydir Street", "postcode": "CB1 2LR"},
{"address": "21 Fulbourn Road Fulbourn", "postcode": "CB1 9JL"},
{"address": "108 Argyle Street", "postcode": "CB1 3LS"},
{"address": "115 Victoria Road", "postcode": "CB4 3BS"},
{"address": "55 Ross Street", "postcode": "CB1 3BP"},
{"address": "16 Kingston Street", "postcode": "CB1 2NU"},
{"address": "13 Thoday Street", "postcode": "CB1 3AS"},
{"address": "103 York Street", "postcode": "CB1 2PZ"},
]
asset_list = pd.DataFrame(addresses)
asset_list["row_id"] = asset_list.index
epc_data, _, _ = get_data(
asset_list=asset_list, fulladdress_column="address", postcode_column="postcode", address1_column="address",
manual_uprn_map={}, epc_api_only=True
)
epc_df = pd.DataFrame(epc_data)
epc_df.shape
asset_list = asset_list.merge(
epc_df, how="left", on="row_id"
)
asset_list = asset_list.rename(columns={"address_x": "Address", "postcode_x": "Postcode"})
asset_list["uprn"] = asset_list["uprn"].astype(str)
spatial_data = OpenUprnClient.get_spatial_data([x["uprn"] for x in epc_data], bucket_name="retrofit-data-dev")
spatial_data["UPRN"] = spatial_data["UPRN"].astype(str)
asset_list = asset_list.merge(
spatial_data, how="left", left_on="uprn", right_on="UPRN"
)
asset_list.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Panacap/Acquisitions EPC Data.csv",
index=False)

View file

@ -4,7 +4,7 @@ from dotenv import load_dotenv
from utils.s3 import save_csv_to_s3
from etl.find_my_epc.AssetListEpcData import AssetListEpcData
PORTFOLIO_ID = 126
PORTFOLIO_ID = 127
USER_ID = 8
load_dotenv(dotenv_path="backend/.env")
@ -19,22 +19,9 @@ def app():
asset_list = [
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308249,
},
{
"address": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308251
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308250,
"address": "49 Brailsford Road",
"postcode": "M14 6PT",
"uprn": 77145666,
}
]
asset_list = pd.DataFrame(asset_list)
@ -65,18 +52,7 @@ def app():
valuation_data = [
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"uprn": 77145666,
"valuation": 337_000
}
]

View file

@ -3777,7 +3777,6 @@ def revised_model():
no_match = []
matches = []
for _, home in tqdm(new_priority_postcodes.iterrows(), total=len(new_priority_postcodes)):
# We check if the property was surveyed
survey_result = coordinated_packages[
coordinated_packages["Organisation Reference"] == home["Organisation Reference"]
@ -3791,6 +3790,7 @@ def revised_model():
} for m in survey_result["Organisation Reference"].values
]
matches.extend(to_extend)
continue
closest_match = find_nearest_matching_property(coordinated_packages, home)
if closest_match is None:
@ -3821,6 +3821,7 @@ def revised_model():
# (3953, 6), (2948, 6), (2969, 7), (2575, 7)
matches_df = pd.DataFrame(matches)
matches_df = matches_df.merge(
coordinated_packages[["Organisation Reference", "Survey: Current EPC Band", "Survey: Current SAP Rating"]],
left_on="Best Match Organisation Reference", right_on="Organisation Reference",
@ -3837,7 +3838,8 @@ def revised_model():
"Number of matches": 1,
"Proportion": 100,
"Estimated SAP Rating": mapped_matches["Survey: Current SAP Rating"].values[0],
"Estimated EPC Rating": mapped_matches["Survey: Current EPC Band"].values[0]
"Estimated EPC Rating": mapped_matches["Survey: Current EPC Band"].values[0],
"Was Surveyed": mapped_matches["Was Surveyed"].values[0],
}
)
continue
@ -3857,7 +3859,8 @@ def revised_model():
"Number of matches": number_of_matches,
"Proportion": proportion_with_this_epc,
"Estimated SAP Rating": average_rating,
"Estimated EPC Rating": average_epc_rating
"Estimated EPC Rating": average_epc_rating,
"Was Surveyed": False
}
)
@ -3973,7 +3976,8 @@ def revised_model():
'Organisation Reference',
'Best Match Organisation Reference',
'Survey: Current EPC Band',
'Survey: Current SAP Rating'
'Survey: Current SAP Rating',
"Was Surveyed"
]
].rename(
columns={
@ -4027,7 +4031,7 @@ def revised_model():
'Organisation Reference', 'Address', 'Postcode', 'Address ID', 'uprn', 'Archetype ID',
'SAP', 'SAP Band', "Property Type", "Walls", "Roofs", 'Glazing',
'Heating', 'Main Fuel', 'Hot Water', 'Number of matches', 'Proportion',
'Estimated SAP Rating', 'Estimated EPC Rating'
'Estimated SAP Rating', 'Estimated EPC Rating', "Was Surveyed"
]
].rename(
columns={
@ -4092,6 +4096,8 @@ def revised_model():
worksheet["uprn"] = worksheet["uprn"].replace("<NA>", "")
worksheet = worksheet.drop(columns=["Last EPC - uprn"])
# Save to Excel with multiple sheets
excel_path = os.path.join(CUSTOMER_FOLDER_PATH, "Jan 2025 Project", "04022025 Stonewater Priority List.xlsx")
with pd.ExcelWriter(excel_path, engine="xlsxwriter") as writer:

View file

@ -217,78 +217,7 @@ def app():
)
)
# We get the EPC data
# epc_data = json.loads(
# read_from_s3(
# bucket_name="retrofit-data-dev",
# s3_file_name="customers/Stonewater/clustering/epc_data.json"
# )
# )
# epc_data = pd.DataFrame(epc_data)
#
# epc_data["uprn"] = np.where(
# epc_data["internal_id"] == 1091,
# 83143766,
# epc_data["uprn"]
# )
#
# epc_data_batch_2 = read_pickle_from_s3(
# s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
# bucket_name="retrofit-data-dev"
# )
# epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
#
# complete_epcs = pd.concat([epc_data, epc_data_batch_2])
#
# epcs_to_merge = complete_epcs[
# [
# "uprn",
# "address",
# "postcode",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "address": "Address",
# "postcode": "Postcode",
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# )
# # We de-dupe, taking the newest on the date the EPC was lod
# epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
# epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
# epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
stonewater_cavity_properties["UPRN"] = stonewater_cavity_properties["UPRN"].astype("Int64").astype(str)
stonewater_cavity_properties["Reason Included"].value_counts()
# Find the postcodes where an Osmosis survey revealed a need for CWI
postcodes_found_needing_cwi = stonewater_cavity_properties[
stonewater_cavity_properties["Reason Included"].isin(
@ -339,12 +268,7 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
) # .merge(
# epcs_to_merge,
# how="left",
# left_on="UPRN",
# right_on="uprn"
# )
)
# We now flag the additional properties in the as built list
@ -434,12 +358,11 @@ def app():
additional_properties["Suspected Needs CWI - not surveyed"] = (
(
additional_properties["Postcode"].isin(postcodes_found_needing_cwi)
additional_properties["Postcode"].isin(postcodes_found_needing_cwi) &
~additional_properties["Installed under ECO3"]
)
)
additional_properties["Same Postcode as Installed under ECO3"].value_counts()
# We drop Full Address
additional_properties = additional_properties.drop(columns=["Full Address"])
additional_properties2 = additional_properties[[
@ -461,65 +384,57 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
) # .merge(
# pd.DataFrame(additional_properties_epcs)[
# [
# "row_id",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# ),
# how="left",
# on="row_id"
# )
)
# Combine the data:
full_dataset = pd.concat([stonewater_cavity_properties, additional_properties2])
# We not define the priority list for non-intrusives
full_dataset["Postal Region"] = full_dataset["Postcode"].str.split(" ").str[0].str[0:2]
full_dataset["Postal Region 2"] = full_dataset["Postcode"].str.split(" ").str[0]
# Strip out anything we definitely don't want
full_dataset = full_dataset[~full_dataset["Installed under ECO3"]]
areas = full_dataset[full_dataset["Suspected Needs CWI - not surveyed"] == True]["Postal Region 2"].unique()
priorities = full_dataset[
full_dataset["Postal Region 2"].isin(areas)
]
region_prevalance = priorities["Postal Region 2"].value_counts().to_frame().reset_index()
region_prevalance = region_prevalance[region_prevalance["count"] > 100]
df = priorities[priorities["Postal Region 2"].isin(region_prevalance["Postal Region 2"].values)]
df["Postal Region"].value_counts()
df["Postal Region 2"].value_counts()
if df["Installed under ECO3"].sum():
raise ValueError("There are properties in the priority list that were installed under ECO3")
df.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/10022025 Non-Intrusives - "
"revised list.xlsx",
index=False
)
# We save the data locally
stonewater_cavity_properties.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
"postcodes.csv",
index=False
)
additional_properties2.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
"non-priority postcodes.csv",
index=False
)
# Save the survey findings
needs_cwi.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI - WIP.csv",
index=False
)
# stonewater_cavity_properties.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
# "postcodes.csv",
# index=False
# )
# additional_properties2.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
# "non-priority postcodes.csv",
# index=False
# )
# # Save the survey findings
# needs_cwi.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI -
# WIP.csv",
# index=False
# )
def cross_reference_epc_programme():
@ -528,6 +443,12 @@ def cross_reference_epc_programme():
"SURVEYED - ECO3 NOT COMPLETED.xlsx"
)
for _, x in eco3_fallout.iterrows():
house_no = SearchEpc.get_house_number(x["ADDRESS"], "")
if house_no is None:
house_no = x["ADDRESS"].split(",")[0]
x["house_number"] = house_no
eco3_fallout["house_number"] = eco3_fallout.apply(
lambda x: SearchEpc.get_house_number(x["ADDRESS"], ""), axis=1
)
@ -558,3 +479,58 @@ def cross_reference_epc_programme():
stonewater_modelled_above_c["Address"].apply(lambda x: fuzz.ratio(x, property["ADDRESS"]) > 90)
]
match.head()
def finalise_list_for_non_intrusives():
non_intrusives_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/20250207 Stonewater "
"Non-Intrusives.xlsx"
)
# Remove anything installed under ECO3
non_intrusives_list = non_intrusives_list[~non_intrusives_list["Installed under ECO3"]]
# We make any properties that were surveyed by Osmosis
packages = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/Stonewater - Bid Packages WIP 14.11.20 V2 "
"(1).xlsx",
header=13,
sheet_name="Modelled Packages"
)
non_intrusives_list["Surveyed by Osmosis"] = non_intrusives_list["Address ID"].isin(
packages["Address ID"].values
)
# Removed 54 addresses
final_non_intrusives = non_intrusives_list[
~non_intrusives_list["Surveyed by Osmosis"]
]
features = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - "
"master sheet.csv",
encoding='latin1'
)
# Add on the orgnisaion reference
final_non_intrusives = final_non_intrusives.merge(
features[["Organisation Reference", "Address ID"]],
how="left",
on="Address ID"
)
final_non_intrusives["Postal Region"] = final_non_intrusives["Postcode"].str.split(" ").str[0].str[0:2]
selected_regions = final_non_intrusives[
final_non_intrusives["Include in non-intrusives"]
]["Postcode"].unique()
final_non_intrusives["Is in region"] = final_non_intrusives["Postcode"].isin(selected_regions)
# Filter down:
final_non_intrusives = final_non_intrusives[
final_non_intrusives["Is in region"]
]
final_non_intrusives.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/10022025 Non-Intrusives "
"List - final.xlsx")

View file

@ -25,6 +25,7 @@ class RetrieveFindMyEpc:
self.postcode = postcode
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
self.walls = []
@staticmethod
def extract_low_carbon_sources(soup):
@ -102,6 +103,8 @@ class RetrieveFindMyEpc:
# 2) Bills estimates
# 3) Recommendations and SAP points
# 4) Low and zero carbon energy sources
# 5) The wall types of the property - used for determining if we have an extension wall insulation#
# recommendation
ratings = address_res.find('desc', {'id': 'svg-desc'}).text
current_rating = ratings.split(".")[0]
@ -208,6 +211,17 @@ class RetrieveFindMyEpc:
if key not in assessment_data:
raise ValueError(f"Missing key: {key}")
# The wall types of the property
property_features_table = address_res.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
# Extract wall types
self.walls = []
for row in property_features_table:
cells = row.find_all("td")
if row.find("th").text.strip() == "Wall":
self.walls.append(cells[0].text.strip())
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@ -229,8 +243,7 @@ class RetrieveFindMyEpc:
return resulting_data
@staticmethod
def format_recommendations(recommendations, assessment_data, sap_2012_date=None):
def format_recommendations(self, recommendations, assessment_data, sap_2012_date=None):
"""
This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey
:param recommendations: The recommendations from the EPC
@ -330,6 +343,8 @@ class RetrieveFindMyEpc:
for rec in recommendations:
mapped = measure_map[rec["measure"]]
for measure in mapped:
if measure == "cavity_wall_insulation" and "solid brick" in self.walls[0].lower():
measure = "extension_cavity_wall_insulation"
to_append = {
"type": measure,
"sap_points": rec["sap_points"],

View file

@ -1,5 +1,6 @@
import os
import time
import pickle
import pandas as pd
import numpy as np
@ -20,7 +21,7 @@ load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, epc_api_only=True):
def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, epc_api_only=False):
epc_data = []
errors = []
no_epc = []
@ -116,10 +117,14 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
find_epc_data = {}
except Exception as e:
@ -176,19 +181,33 @@ def app():
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/For Housing"
DATA_FILENAME = "For Housing Data pull.xlsx"
SHEET_NAME = "Sheet1"
POSTCODE_COLUMN = "Post Code"
FULLADDRESS_COLUMN = None
ADDRESS1_COLUMN = "NO."
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Places For People"
DATA_FILENAME = "Regulated Stock - Do Not Change (06.06.24).xlsx"
SHEET_NAME = "Assets 1"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = "Address"
ADDRESS1_COLUMN = "AddressLine1"
ADDRESS1_METHOD = None
ADDRESS_COLS_TO_CONCAT = ["NO.", "Street / Block Name"]
ADDRESS_COLS_TO_CONCAT = []
MISSING_POSTCODES_METHOD = None
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
if MISSING_POSTCODES_METHOD is not None:
if MISSING_POSTCODES_METHOD == "last_two_words":
# Replace any double spaces
asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False)
asset_list["Postcode"] = np.where(
pd.isnull(asset_list["Postcode"]),
asset_list[FULLADDRESS_COLUMN].str.split(" ").str[-2:].str.join(" "),
asset_list["Postcode"]
)
else:
raise ValueError(f"Method {MISSING_POSTCODES_METHOD} not recognized")
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
asset_list["row_id"] = asset_list.index
@ -217,29 +236,46 @@ def app():
asset_list = asset_list[~asset_list["deduper"].duplicated()]
asset_list = asset_list.drop(columns=["deduper"])
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# We chunk up this data into 5000 rows at a time
chunk_size = 5000
epc_data = []
errors = []
no_epc = []
skip = None # Used to skip already completed chunks
for i in range(0, len(asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None:
if i <= skip:
continue
chunk = asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
asset_list=chunk,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# We now retrieve any failed properties
asset_list_failed = asset_list[asset_list["row_id"].isin(errors)]
epc_data_failed, _, _ = get_data(
asset_list=asset_list_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk["row_id"].isin(errors)]
epc_data_failed, _, _ = get_data(
asset_list=chunk_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP,
epc_api_only=False
)
no_data = asset_list[asset_list["row_id"].isin(no_epc)]
print(no_data[[FULLADDRESS_COLUMN, POSTCODE_COLUMN]])
epc_data_chunk.extend(epc_data_failed)
errors.extend(errors_chunk)
no_epc.extend(no_epc_chunk)
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
epc_data.extend(epc_data_chunk)
epc_df = pd.DataFrame(epc_data)
@ -339,7 +375,7 @@ def app():
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"built-form": "Archetype - EPC",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
@ -375,7 +411,7 @@ def app():
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
built_form=x["Archetype - EPC"]
),
axis=1
)
@ -406,3 +442,48 @@ def app():
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]
import requests
import base64
API_KEY = "c4afe10370d67eeaa44f067dd37d115263f6c90e"
URL = "https://epc.opendatacommunities.org/api/v1/domestic/search?size=20"
email = "itskruel@gmail.com"
AUTH_TOKEN = base64.b64encode(
":".join([email, API_KEY]).encode("utf-8")
)
AUTH_TOKEN = "aXRza3J1ZWxAZ21haWwuY29tOmM0YWZlMTAzNzBkNjdlZWFhNDRmMDY3ZGQzN2QxMTUyNjNmNmM5MGU="
headers = {
"Authorization": "Basic {auth_token}".format(auth_token=AUTH_TOKEN),
"Accept": "application/json",
}
params = {
"UPRN": "766024370"
}
response = requests.get(url="https://epc.opendatacommunities.org/api/v1/domestic/search?size=20&UPRN=766024370",
headers=headers)
response.json()
data = response.json()
from operator import itemgetter
newest = sorted(data["rows"], key=itemgetter('lodgement-date'))
data["rows"][0]["lodgement-date"]
data["rows"][1]["lodgement-date"]
import pandas as pd
df = pd.DataFrame(data["rows"])
df["uprn"].values[2]
df[df["uprn"] == "3455035000"]["property-type"]
from backend.apis.GoogleSolarApi import GoogleSolarApi

View file

@ -503,7 +503,9 @@ class Recommendations:
impact_summary.append(
{
"phase": rec["phase"],
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
"sap": sap + rec["sap_points"],
"carbon": carbon - rec["co2_equivalent_savings"],
"heat_demand": heat_demand - rec["heat_demand"],