diff --git a/.idea/Model.iml b/.idea/Model.iml
index 762580d9..df6c4faa 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index c916a158..50cad4ca 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/etl/customers/remote_assessments/app.py b/etl/customers/remote_assessments/app.py
index e1298565..f32dcea6 100644
--- a/etl/customers/remote_assessments/app.py
+++ b/etl/customers/remote_assessments/app.py
@@ -19,9 +19,9 @@ def app():
asset_list = [
{
- "address": "49 Brailsford Road",
- "postcode": "M14 6PT",
- "uprn": 77145666,
+ "address": "19 Hillcrest Court",
+ "postcode": "IP21 4YJ",
+ "uprn": 2630134524,
}
]
asset_list = pd.DataFrame(asset_list)
@@ -52,8 +52,8 @@ def app():
valuation_data = [
{
- "uprn": 77145666,
- "valuation": 337_000
+ "uprn": 2630134524,
+ "valuation": 96_000
}
]
# Store valuation data to s3
diff --git a/etl/customers/stonewater/potential_eco_properties.py b/etl/customers/stonewater/potential_eco_properties.py
index eef82eae..6666ce15 100644
--- a/etl/customers/stonewater/potential_eco_properties.py
+++ b/etl/customers/stonewater/potential_eco_properties.py
@@ -368,9 +368,10 @@ def app():
additional_properties2 = additional_properties[[
"Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
"Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area", 'Installed under ECO3',
- 'Same Postcode as Installed under ECO3'
+ 'Same Postcode as Installed under ECO3', "Organisation Reference",
]].rename(
columns={
+ "Organisation Reference": "Org. ref.",
"SAP": "Parity - Predicted SAP",
"SAP Band": "Parity - Predicted SAP Band",
"Age": "Parity - Build Age",
@@ -387,7 +388,12 @@ def app():
)
# Combine the data:
- full_dataset = pd.concat([stonewater_cavity_properties, additional_properties2])
+
+ stonewater_cavity_properties2 = stonewater_cavity_properties.merge(
+ features[["Address", "Organisation Reference"]], how="left", on="Organisation Reference"
+ )
+ full_dataset = pd.concat([stonewater_cavity_properties2, additional_properties2])
+ full_dataset = full_dataset.drop(columns=['Osm. ID'])
# We not define the priority list for non-intrusives
full_dataset["Postal Region"] = full_dataset["Postcode"].str.split(" ").str[0].str[0:2]
@@ -414,7 +420,7 @@ def app():
df.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/10022025 Non-Intrusives - "
- "revised list.xlsx",
+ "revised list.csv",
index=False
)
diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py
index cc50caae..dba85b3f 100644
--- a/etl/route_march_data_pull/app.py
+++ b/etl/route_march_data_pull/app.py
@@ -1,7 +1,6 @@
import os
import time
-import pickle
-
+from BaseUtility import Definitions
import pandas as pd
import numpy as np
from tqdm import tqdm
@@ -17,6 +16,10 @@ from recommendations.recommendation_utils import (
estimate_number_of_floors
)
+from etl.epc_clean.epc_attributes.attribute_utils import (
+ extract_thermal_transmittance
+)
+
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
@@ -158,6 +161,53 @@ def extract_address1(asset_list, full_address_col, method="first_two_words"):
raise ValueError(f"Method {method} not recognized")
+def process_age_band(x, year_built_column):
+ year_built = float(x[year_built_column])
+
+ if pd.isnull(x["Property Age Band"]) or (
+ x["Property Age Band"] in Definitions.DATA_ANOMALY_MATCHES
+ ) or pd.isnull(year_built):
+ return "No EPC Age Band"
+
+ # We check if we have a numeric data
+ if x["Property Age Band"].isdigit():
+ if year_built == float(x["Property Age Band"]):
+ return "EPC Age Band Matches Year Built"
+ if year_built > float(x["Property Age Band"]):
+ return "EPC Age Band is older than Year Built"
+ if year_built < float(x["Property Age Band"]):
+ return "EPC Age Band is newer than Year Built"
+
+ # Handle specific case
+ if x["Property Age Band"] == "England and Wales: 2007 onwards":
+ if year_built >= 2007:
+ return "EPC Age Band Matches Year Built"
+ if year_built < 2007:
+ return "EPC Age Band is older than Year Built"
+
+ if x["Property Age Band"] == "England and Wales: before 1900":
+ if year_built < 1900:
+ return "EPC Age Band Matches Year Built"
+ if year_built >= 1900:
+ return "EPC Age Band is newer than Year Built"
+
+ # Age band will be formatted as such:
+ # 'England and Wales: {upper date}-{lower date}'
+ # so we extract the lower and upper date
+ age_band = x["Property Age Band"].split(": ")[1]
+ lower_date, upper_date = age_band.split("-")
+ if year_built <= float(upper_date) and year_built <= float(upper_date):
+ return "EPC Age Band Matches Year Built"
+
+ if year_built > float(upper_date):
+ return "EPC Age Band is older than Year Built"
+
+ if year_built < float(upper_date):
+ return "EPC Age Band is newer than Year Built"
+
+ raise Exception("Should not reach here")
+
+
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
@@ -179,17 +229,47 @@ def app():
Heat loss calculations
EPC recommendations
Property UPRN
-
"""
- DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Places For People"
- DATA_FILENAME = "Regulated Stock - Do Not Change (06.06.24).xlsx"
- SHEET_NAME = "Assets 1"
+
+ # TODO:
+ # For cavity work:
+ # - Flag any entries that have a different wall type between non-intrusive data against EPC
+ # - Worth double checking entries that have a difference in wall construction
+ # - Look at anything that is flagged as an empty cavity but the EPC data says it’s a filled cavity
+ # - Look at the current EPC scores - Anything that is C75 or above, especially if it’s assumed no insulation
+ # - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
+ # are less than C75
+ # - Flag anything pre SAP2012
+ # - Flag anything over 5 years old
+ # - Look at year built vs age band
+ #
+ # For Solar:
+ # - Discount any that have solar PV - based on non-intrusives and from the inspections team
+ # - In the heating, discount anything that isn’t ashp, ghsp, hhrs, electric storage - possibly homes with
+ # electric room heaters but it might need to be an EPC E
+ # - Fabric - check the floor, wall and roof:
+ # - Filled or empty cavity is good
+ # - Insulated solid/timber/system built is good
+ # - SCIS/CEG needs solid floors
+ # - JJC don’t care
+ # - Anything with a loft 200 or below
+ # - Anything C75 and above won’t qualify
+ # - Insulated loft = 200mm
+ # - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
+ # - Or the insulation required is loft/cavity (floors should be solid)
+
+ DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Eastlight"
+ DATA_FILENAME = "Eastlight addresses potential PV data pull required.xlsx"
+ SHEET_NAME = "Sheet1"
POSTCODE_COLUMN = "Postcode"
- FULLADDRESS_COLUMN = "Address"
- ADDRESS1_COLUMN = "AddressLine1"
+ FULLADDRESS_COLUMN = None
+ ADDRESS1_COLUMN = "HouseName"
ADDRESS1_METHOD = None
- ADDRESS_COLS_TO_CONCAT = []
+ ADDRESS_COLS_TO_CONCAT = [
+ "HouseName", "Block", "Address1"
+ ]
MISSING_POSTCODES_METHOD = None
+ PROPERTY_YEAR_BUILT = 'Built In Year'
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
@@ -216,6 +296,7 @@ def app():
asset_list[col] = asset_list[col].astype(str)
asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False)
asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False)
+ asset_list[col] = asset_list[col].str.strip()
if ADDRESS1_COLUMN is None:
ADDRESS1_COLUMN = "address1_extracted"
@@ -226,7 +307,15 @@ def app():
if FULLADDRESS_COLUMN is None:
FULLADDRESS_COLUMN = "fulladdress_extracted"
# We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas
- asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(lambda x: ", ".join(x), axis=1)
+ # Sometimes, some of the columns are empty, so we need to remove them
+ asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(
+ lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1
+ )
+
+ # We clean up portential non-breaking spaces, and double spaces
+ asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].astype(str)
+ asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace('\xa0', ' ', regex=False)
+ asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False)
# We check for duplicated addresses
asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN]
@@ -237,8 +326,10 @@ def app():
asset_list = asset_list.drop(columns=["deduper"])
# We chunk up this data into 5000 rows at a time
+ # Create the chunks directory
+ if not os.path.exists(os.path.join(DATA_FOLDER, "Chunks")):
+ os.makedirs(os.path.join(DATA_FOLDER, "Chunks"))
chunk_size = 5000
- epc_data = []
errors = []
no_epc = []
skip = None # Used to skip already completed chunks
@@ -275,9 +366,19 @@ def app():
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
- epc_data.extend(epc_data_chunk)
+ # We read in and concatenate the created created chunks
+ chunks_folder = os.path.join(DATA_FOLDER, "Chunks")
+ # List the contents
+ chunk_files = os.listdir(chunks_folder)
+ epc_data = []
+ for file in chunk_files:
+ csv_data = pd.read_csv(os.path.join(chunks_folder, file))
+ # We need to convert the recommendations back to a list
+ csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
+ csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
+ epc_data.append(csv_data)
- epc_df = pd.DataFrame(epc_data)
+ epc_df = pd.concat(epc_data)
# We expand out the recommendations
recommendations_df = epc_df[["row_id", "recommendations"]]
@@ -302,9 +403,9 @@ def app():
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
- # Drop the column that is ""
- if "" in transformed_df.columns:
- transformed_df = transformed_df.drop(columns=[""])
+ # At the moment, we're only using a limited set of columns - let's jut keep cavity wall insulation
+ # recommendations
+ transformed_df = transformed_df[["row_id", "Cavity wall insulation"]]
# Get the find my epc data
find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join(
@@ -342,7 +443,9 @@ def app():
"energy-consumption-current", # kwh/m2
"photo-supply",
]
- ].rename(columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"})
+ ].rename(
+ columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"}
+ )
asset_list = asset_list.merge(
epc_df,
@@ -422,6 +525,138 @@ def app():
axis=1
)
+ # We produce some additional fields
+ # 1) Is the SAP rating below C75
+ asset_list["SAP Rating is 75 and below"] = asset_list["SAP score on register"] <= 75
+ # 2) Flag anything where the EPC is older than 5 years
+ cutoff_year = pd.Timestamp.now().year - 5
+ asset_list[f"EPC is pre {cutoff_year}"] = (
+ pd.to_datetime(asset_list["Date of last EPC"]).dt.year < cutoff_year
+ )
+
+ # 3) If we have year in the asset list, we flag entries where the built year is different from the
+ # EPC Age band
+ if PROPERTY_YEAR_BUILT is not None:
+ asset_list["Does Age Match EPC Age Band?"] = asset_list.apply(
+ lambda x: process_age_band(x, PROPERTY_YEAR_BUILT), axis=1
+ )
+
+ # 4) Flag properties that look like they're good candidates for solar installs
+ # Firstly, flag if the fabric is completely done
+
+ insulated_wall_substrings = [
+ ", insulated", "with external insulation", "with internal insulation", "filled cavity"
+ ]
+
+ insulated_roof_substrings = [
+ "(another dwelling above)", "limited insulation", "(other premises above)",
+ ", no insulation",
+ ]
+
+ def check_solar_insulation_conditions(x):
+
+ if pd.isnull(x["Wall Construction"]):
+ return None
+
+ if "average thermal transmittance" in x["Wall Construction"].lower():
+ # We extract out the u-values
+ wall_uvalue = extract_thermal_transmittance({}, x["Wall Construction"])[0]["thermal_transmittance"]
+ roof_uvalue = extract_thermal_transmittance({}, x["Roof Construction"])[0]["thermal_transmittance"]
+ floor_uvalue = extract_thermal_transmittance({}, x["Floor Construction"])[0]["thermal_transmittance"]
+
+ roof_uvalue = 0 if roof_uvalue is None else roof_uvalue
+ floor_uvalue = 0 if floor_uvalue is None else floor_uvalue
+
+ # We apply some cutoffs
+ if wall_uvalue < 0.7 and roof_uvalue < 0.7 and floor_uvalue < 0.7:
+ return "Walls, Roof and Floor have U-values below 0.7"
+
+ return "Confirm U-values"
+
+ walls_insulated = any(
+ insulated_substring in x["Wall Construction"].lower() for insulated_substring in insulated_wall_substrings
+ )
+ roof_is_numeric = False
+ if str(x["Roof Insulation Thickness"]).isdigit():
+ roof_is_numeric = True
+ roof_insulated = int(x["Roof Insulation Thickness"]) >= 200
+ else:
+ roof_insulated = any(
+ insulated_substring in x["Roof Construction"].lower() for insulated_substring in
+ insulated_roof_substrings
+ )
+
+ floor_is_solid = "solid" in x["Floor Construction"].lower()
+
+ if walls_insulated and roof_insulated and floor_is_solid:
+ return "Walls Insulated, Roof Insulated, Floor Solid"
+
+ if walls_insulated and floor_is_solid and roof_is_numeric:
+ return "Walls Insulated, Floor Solid, Loft need top-up"
+
+ return "Not Fully Insulated or no data"
+
+ asset_list["Solar Fabric Condition"] = asset_list.apply(check_solar_insulation_conditions, axis=1)
+
+ asset_list["Good Solar Candidate"] = (
+ asset_list["SAP Rating is 75 and below"] &
+ ~asset_list["Has Solar PV"] &
+ (
+ asset_list["Heating Type"].isin(
+ [
+ "Electric storage heaters",
+ "Room heaters, electric",
+ ]
+ ) | asset_list["Heating Type"].str.contains("heat pump", case=False)
+ ) & (
+ asset_list["Solar Fabric Condition"].isin(
+ [
+ "Walls Insulated, Roof Insulated, Floor Solid",
+ "Walls, Roof and Floor have U-values below 0.7",
+ "Walls Insulated, Floor Solid, Loft need top-up"
+ ]
+ )
+ )
+ )
+
+ def flat_analysis(asset_list):
+
+ # We need to deduce the building name - we strip out the house number
+ def extract_building_name(x):
+ # TODO: This doesn't really work
+ if pd.isnull(x):
+ return None
+ house_no = SearchEpc.get_house_number(address=x, postcode=None)
+ if house_no:
+ return x.replace(house_no, "").strip()
+ return x.split(",")[0].strip()
+
+ # We want to deduce if flats have 50% of the properties below C75
+ # We group by postcode and property type
+ grouped = asset_list.groupby(["Postcode", "Property Type"])
+
+ flat_data = []
+ for _, group in grouped:
+ if "flat" in group["Property Type"].str.lower().values:
+ num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0)
+ num_below_c75 = group["SAP score on register"].lt(75).sum()
+
+ flat_data.append(
+ {
+ "Postcode": group["Postcode"].iloc[0],
+ "Property Type": "Flat",
+ "Number of Flats with EPC": num_flats,
+ "Number of Flats below C75": num_below_c75,
+ "Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats)
+ }
+ )
+
+ flat_data = pd.DataFrame(flat_data)
+
+ return flat_data
+
+ flat_data = flat_analysis(asset_list)
+
# For all of the columns in transformed_df, prefix with "Recommendation: "
for col in transformed_df.columns:
if col == "row_id":
@@ -436,54 +671,13 @@ def app():
asset_list = asset_list.drop(columns=["row_id", "index"])
# Store as an excel
- filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
- asset_list.to_excel(filename, index=False)
+ filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
+ # Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
+
+ with pd.ExcelWriter(filename) as writer:
+ asset_list.to_excel(writer, sheet_name="EPC Data", index=False)
+ flat_data.to_excel(writer, sheet_name="Flat Data", index=False)
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]
-
-
-import requests
-import base64
-
-API_KEY = "c4afe10370d67eeaa44f067dd37d115263f6c90e"
-URL = "https://epc.opendatacommunities.org/api/v1/domestic/search?size=20"
-email = "itskruel@gmail.com"
-
-AUTH_TOKEN = base64.b64encode(
- ":".join([email, API_KEY]).encode("utf-8")
-)
-
-AUTH_TOKEN = "aXRza3J1ZWxAZ21haWwuY29tOmM0YWZlMTAzNzBkNjdlZWFhNDRmMDY3ZGQzN2QxMTUyNjNmNmM5MGU="
-
-headers = {
- "Authorization": "Basic {auth_token}".format(auth_token=AUTH_TOKEN),
- "Accept": "application/json",
-}
-
-params = {
- "UPRN": "766024370"
-}
-
-response = requests.get(url="https://epc.opendatacommunities.org/api/v1/domestic/search?size=20&UPRN=766024370",
- headers=headers)
-response.json()
-
-data = response.json()
-
-from operator import itemgetter
-
-newest = sorted(data["rows"], key=itemgetter('lodgement-date'))
-data["rows"][0]["lodgement-date"]
-data["rows"][1]["lodgement-date"]
-
-import pandas as pd
-
-df = pd.DataFrame(data["rows"])
-
-df["uprn"].values[2]
-
-df[df["uprn"] == "3455035000"]["property-type"]
-
-from backend.apis.GoogleSolarApi import GoogleSolarApi