reviewing stonewater assigned packages

This commit is contained in:
Khalim Conn-Kowlessar 2024-12-10 17:02:59 +00:00
parent c41891f0fa
commit 3c98cfa7cc
5 changed files with 224 additions and 18 deletions

View file

@ -54,6 +54,10 @@ for directory in tqdm(epc_directories):
& (off_gas["is_heritage_building"] != True)
]
off_gas = off_gas[
off_gas["tenure"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
region_summary = off_gas.groupby("postal_region").size().reset_index(name="count")
aggregation.append(region_summary)

View file

@ -2607,5 +2607,130 @@ def propsed_wave_3_sample():
len({v for v in units_in_bid if str(v) in u_aids})
len(list(set(units_in_bid)))
def identify_incorrect_pacakges():
"""
Due to limitations in the data collected during survey, we have some properties that do not have suitable packages
assigned. This function will identify those properties, which can be flagged for Stonewater's review
"""
units_with_assigned_packages = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.20 V2.xlsx"),
header=2,
sheet_name="Individual Units Programme"
)
# This sheet contains information on the heating systems for properties, so we can flag any units that have
# been labelled as being electric but are actually gas
heating_survey_data = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "STOCKBOOK December 2024 data (5).xlsx"),
header=0,
sheet_name="Export"
)
units_with_assigned_packages = units_with_assigned_packages.merge(
heating_survey_data[["Asset Reference", "Heating Type"]], how="left",
left_on="Org. ref.", right_on="Asset Reference"
)
# Check the different heating types
units_with_assigned_packages["Gas properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"].isin(["Gas", "Communal Gas"])) & (
units_with_assigned_packages["Heating"].isin(
[
"Heat Pump: Electric Heat "
"pumps: Air source heat pump "
"with flow temperature <= 35°C",
"Electric Storage Systems: Fan "
"storage heaters",
"Electric (direct acting) room "
"heaters: Panel, convector or "
"radiant heaters"
]
)
)
)
units_with_assigned_packages["Electric properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Electric") & (
units_with_assigned_packages["Heating"].isin(
[
"Boiler: A rated Regular Boiler",
"Boiler: F rated Combi",
"No Heating",
"Boiler: A rated CPSU",
"Boiler: G rated Regular Boiler"
]
)
)
)
units_with_assigned_packages["Ground Source properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Ground Source") & (
units_with_assigned_packages["Heating"].isin(
[
"Heat Pump: Electric Heat pumps: Air source heat pump with flow temperature <= 35°C",
"Electric Storage Systems: Fan storage heaters",
"Electric Storage Systems: High heat retention storage heaters"
]
)
)
)
units_with_assigned_packages["LPG properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Lpg") & (
units_with_assigned_packages["Main Fuel"].isin(
[
"Gas: Mains Gas", "Solid Fuel: Wood Logs, Gas: Mains Gas"
]
)
)
)
units_with_assigned_packages["Solid Fuel properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Solid Fuel") & (
units_with_assigned_packages["Main Fuel"].isin(
[
"Gas: Mains Gas"
]
)
)
)
# The next check is to identify properties with specific features that are not condusive to specific packages. E.g.
# Solar PV packages for properties that have another dwelling above
z = units_with_assigned_packages[
units_with_assigned_packages["Package Ref"].isin(
[
"3A", "3B", "4", 4
]
)
]
z["Roof Type"].value_counts()
z["Survey: Main Roof Type"].value_counts()
z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")][
"Survey: Matching Address ID"].value_counts()
zz = z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")][
["Survey: Matching Address ID", "Survey: Org. ref.", "Survey: Main Roof Type"]
].drop_duplicates()
zz = zz.sort_values("Survey: Matching Address ID")
zz.to_csv(os.path.join(CUSTOMER_FOLDER_PATH, "3A, 3B or 4 Packages with a dwelling above.csv"), index=False)
z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")]["Package Ref"].value_counts()
# Label properties that have been matched to a package, during coordination, that includes Solar PV and has
# a property with a dwelling above
units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = (
(units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & (
units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above")
)
)
# Label properties that have a dwelling above in the Parity data, and weren't surveyed, but have been assigned
# a package that includes solar PV
# if __name__ == "__main__":
# main()

View file

@ -0,0 +1,77 @@
"""
This is the list of properties, based on the EPC data, that look eligible for WHLG
"""
import pandas as pd
from etl.epc.settings import EARLIEST_EPC_DATE
from etl.spatial.OpenUprnClient import OpenUprnClient
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates/domestic-E09000031-Waltham-Forest/certificates.csv"
)
epc_data.columns = [c.replace("_", "-").lower() for c in epc_data.columns]
epc_data = epc_data[epc_data["lodgement-date"] >= EARLIEST_EPC_DATE]
epc_data = epc_data[~pd.isnull(epc_data["uprn"])]
epc_data["uprn"] = epc_data["uprn"].astype(int)
epc_data = epc_data[epc_data["current-energy-rating"].isin(["D", "E", "F", "G"])]
epc_data = epc_data[epc_data["tenure"].isin(
["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
whlg_eligible_postcodes = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/WHLG-eligible-postcodes.xlsx",
sheet_name="Eligible postcodes",
header=1
)
# Format:
whlg_eligible_postcodes = whlg_eligible_postcodes[['Postcode', 'Local Authority']]
uprns = epc_data["uprn"].unique()
# Get data
ca_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev")
epc_data = epc_data.merge(
ca_data[["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]].rename(
columns={"UPRN": "uprn"}
),
how="left",
on="uprn",
)
epc_data["has_conservation_restrictions"] = (
(epc_data["conservation_status"] == True)
| (epc_data["is_listed_building"] == True)
| (epc_data["is_heritage_building"] == True)
)
# Pathway 1:
# Match based on eligible postcodes
pathway1 = epc_data[epc_data["postcode"].isin(whlg_eligible_postcodes["Postcode"].values)]
pathway1 = pathway1[
[
"uprn", "address", "address1", "postcode", "current-energy-rating", "current-energy-efficiency",
"lodgement-date",
"has_conservation_restrictions", "walls-description", "roof-description", "mainheat-description"
]
]
pathway1 = pathway1.rename(
columns={
"current-energy-rating": "EPC Rating", "current-energy-efficiency": "SAP Score",
"lodgement-date": "EPC Date", "has_conservation_restrictions": "Conservation Area Restrictions",
"walls-description": "Wall Type", "roof-description": "Roof Type", "mainheat-description": "Main Heating"
}
)
pathway1["EPC Date"] = pd.to_datetime(pathway1["EPC Date"]).dt.strftime("%Y-%m-%d")
# Create a year EPC was lodged
pathway1["EPC Year"] = pd.to_datetime(pathway1["EPC Date"]).dt.year
pathway1.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Waltham Forest WHLG - Pathway 1 Eligibility.csv",
index=False
)
# Pathway 2 or 3
# The household will need to be means tested
pathway2 = epc_data[~epc_data["uprn"].isin(pathway1["uprn"].values)]

View file

@ -308,6 +308,7 @@ class RetrieveFindMyEpc:
"Heating controls (programmer, and thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": []
}
survey = True

View file

@ -25,7 +25,7 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
epc_data = []
errors = []
no_epc = []
# home = asset_list[asset_list["row_id"] == errors[15]].squeeze()
# home = asset_list[asset_list["row_id"] == errors[5]].squeeze()
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
@ -154,21 +154,17 @@ def app():
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Bromford"
DATA_FILENAME = "BROMFORD - SOLAR PV ROOFs INSPECTED - Electric only properties getting to C list.xlsx"
SHEET_NAME = "MAIN"
POSTCODE_COLUMN = "Post Code"
FULLADDRESS_COLUMN = "Full Address"
ADDRESS1_COLUMN = None
ADDRESS1_METHOD = "first_two_words"
ADDRESS_COLS_TO_CONCAT = ["House No", "Street", "District"]
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Watford"
DATA_FILENAME = "JS Mailing List 10122024.xlsx"
SHEET_NAME = "Export"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = "Property Address"
ADDRESS1_COLUMN = "Address Line 1"
ADDRESS1_METHOD = None
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {
"1 Ivy Court, The Gardens, Erdington, Birmingham": 100071442178,
"8 Ivy Court, The Gardens, Erdington, Birmingham": 10033393299,
"7 Ivy Court, The Gardens, Erdington, Birmingham": 100071442184,
}
MANUAL_UPRN_MAP = {}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
@ -197,6 +193,7 @@ def app():
# Drop the dupes
print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping")
asset_list = asset_list[~asset_list["deduper"].duplicated()]
asset_list = asset_list.drop(columns=["deduper"])
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
@ -212,7 +209,8 @@ def app():
asset_list=asset_list_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# Append the failed data to the main data
@ -261,6 +259,7 @@ def app():
"row_id",
"uprn",
"address1",
"address",
"postcode",
"property-type",
"built-form",
@ -282,7 +281,7 @@ def app():
"energy-consumption-current", # kwh/m2
"photo-supply",
]
].rename(columns={"address1": "Address1 on EPC", "postcode": "Postcode on EPC"})
].rename(columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"})
asset_list = asset_list.merge(
epc_df,
@ -376,9 +375,9 @@ def app():
asset_list = asset_list.drop(columns=["row_id"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
asset_list.to_excel(filename, index=False)
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address1 on EPC", "Postcode on EPC"]
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]