mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
402 lines
14 KiB
Python
402 lines
14 KiB
Python
import pandas as pd
|
|
from tqdm import tqdm
|
|
import os
|
|
from model_data.BoreholeClient import BoreholeClient
|
|
from model_data.LandRegistryClient import LandRegistryClient
|
|
from model_data.ConservationAreaClient import ConservationAreaClient
|
|
|
|
from model_data.temp_inputs import input_data
|
|
from model_data.Property import Property
|
|
from model_data.config import EPC_AUTH_TOKEN
|
|
from epc_api.client import EpcClient
|
|
from model_data.downloader import pagenated_epc_download
|
|
from model_data.EpcClean import EpcClean
|
|
from model_data.OpenUprnClient import OpenUprnClient
|
|
from model_data.analysis.UvalueEstimations import UvalueEstimations
|
|
|
|
LAND_REGISTRY_PATHS = [
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv",
|
|
]
|
|
|
|
|
|
def handler():
|
|
# To begin with, the input data is a list of dictionaries, however we would read this file in
|
|
|
|
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
|
|
|
|
input_properties = [
|
|
Property(postcode=config['postcode'], address1=config['address1'], epc_client=epc_client)
|
|
for config in input_data
|
|
]
|
|
|
|
for p in input_properties:
|
|
p.search_address_epc()
|
|
p.set_year_built()
|
|
|
|
uprns = [p.data['uprn'] for p in input_properties]
|
|
|
|
open_uprn_client = OpenUprnClient(
|
|
path=os.path.abspath(
|
|
os.path.dirname(__file__)
|
|
) + "/model_data/local_data/osopenuprn_202306_csv/osopenuprn_202305.csv",
|
|
uprns=uprns
|
|
)
|
|
open_uprn_client.read()
|
|
|
|
# We're using Ordinance Survey Open Uprn data
|
|
# to find the coordinates of each address, which we will then be able to use at a later stage
|
|
for p in input_properties:
|
|
p.get_coordinates(open_uprn_client)
|
|
|
|
conservation_area_client = ConservationAreaClient(
|
|
historic_england_path=os.path.abspath(
|
|
os.path.dirname(__file__)
|
|
) + "/model_data/local_data/Historic_Eng_Conservation_Areas/Conservation_Areas.shp",
|
|
gov_path=os.path.abspath(
|
|
os.path.dirname(__file__)
|
|
) + "/model_data/local_data/gov-conservation-area.geojson"
|
|
)
|
|
conservation_area_client.read()
|
|
|
|
# Check if the property is in a conversation area
|
|
for p in input_properties:
|
|
p.set_is_in_conservation_area(conservation_area_client)
|
|
|
|
local_authorities = {p.data['local-authority'] for p in input_properties}
|
|
# TODO: Do this at a constituency level
|
|
constituencies = {p.data["constituency"] for p in input_properties}
|
|
property_types = ["bungalow", "flat", "house", "maisonette", "park home"]
|
|
|
|
# We pull properties from local authorities, by property type. This will allow us to build
|
|
# a dataset of up to 10k properties per local authority/property type combination
|
|
data = []
|
|
for la in tqdm(local_authorities):
|
|
for pt in property_types:
|
|
data.extend(
|
|
pagenated_epc_download(
|
|
client=epc_client,
|
|
params={"local-authority": la, "property-type": pt},
|
|
page_size=5000,
|
|
n_pages=10,
|
|
)
|
|
)
|
|
|
|
# Incorporate input data into cleaning
|
|
cleaner = EpcClean(data + [p.data for p in input_properties])
|
|
cleaner.clean()
|
|
|
|
address_meta = [
|
|
{
|
|
"postcode": x["postcode"].upper(),
|
|
"address1": x["address1"].upper(),
|
|
"address2": x["address2"].upper(),
|
|
"address3": x["address3"].upper(),
|
|
"address": x["address"],
|
|
"uprn": x["uprn"]
|
|
} for x in data
|
|
]
|
|
|
|
# Land registry
|
|
land_registry_client = LandRegistryClient(
|
|
paths=LAND_REGISTRY_PATHS,
|
|
addresses=address_meta
|
|
)
|
|
lr_data = land_registry_client.read()
|
|
|
|
# Borehole
|
|
borehole_client = BoreholeClient(
|
|
path=os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/borehole/borehole.dbf"
|
|
)
|
|
borehole_client.read()
|
|
|
|
# Now, for our input properties, we need to identify the components of the building, based
|
|
# on the cleaning we've done
|
|
for p in input_properties:
|
|
p.get_components(cleaner)
|
|
|
|
# TODO: Add property age band into this
|
|
uvalue_estimates = UvalueEstimations(data=data)
|
|
uvalue_estimates.get_estimates(cleaner=cleaner)
|
|
|
|
# all_data = {
|
|
# "input_properties": input_properties,
|
|
# "cleaner": cleaner,
|
|
# "uvalue_estimates": uvalue_estimates,
|
|
# "land_registry_client": land_registry_client,
|
|
# "borehole_client": borehole_client,
|
|
# "conservation_area_client": conservation_area_client,
|
|
# "open_uprn_client": open_uprn_client,
|
|
# "data": data
|
|
# }
|
|
|
|
# import pickle
|
|
# with open("all_data.pkl", "wb") as f:
|
|
# pickle.dump(all_data, f)
|
|
|
|
# input_properties[4].data["address1"]
|
|
# input_properties[4].data["postcode"]
|
|
# floors_df["address1"].values[4]
|
|
# floors_df["original_description"].values[4]
|
|
#
|
|
# df = pd.DataFrame(
|
|
# [
|
|
# x.data for x in input_properties
|
|
# ]
|
|
# )
|
|
# df["property-type"].unique()
|
|
#
|
|
# from model_data.recommendations.WallRecommendations import WallRecommendations
|
|
# all_res = []
|
|
# for p in input_properties:
|
|
# inst = WallRecommendations(property_instance=p, uvalue_estimates=uvalue_estimates)
|
|
# inst.recommend()
|
|
# n_recs = len(inst.recommendations)
|
|
# all_res.append(n_recs)
|
|
#
|
|
# self = WallRecommendations(property_instance=input_properties[2], uvalue_estimates=uvalue_estimates)
|
|
# input_properties[6].walls
|
|
# self.recommend()
|
|
# df = pd.DataFrame(self.recommendations[0]["parts"])
|
|
# recommendations = pd.DataFrame(self.recommendations)
|
|
#
|
|
# from model_data.recommendations.FloorRecommendations import FloorRecommendations
|
|
# self = FloorRecommendations(property_instance=input_properties[4], uvalue_estimates=uvalue_estimates)
|
|
# self.recommendations
|
|
# self.recommend()
|
|
# self.recommendations
|
|
#
|
|
# # We need to deduce a U-value for "Good" energy effieciency
|
|
#
|
|
# mainheating = pd.DataFrame(
|
|
# [{"address1": p.address1, "postcode": p.postcode, **p.main_heating} for p in input_properties])
|
|
# hotwater = pd.DataFrame([{"address1": p.address1, **p.hotwater} for p in input_properties])
|
|
#
|
|
# mainheating[["address1", "postcode"]]
|
|
#
|
|
# # TODO: I want to knwo what "Good" efficiency means for the description
|
|
# # 'Flat 28, 22 Adelina Grove' 'Solid brick, as built, insulated (assumed)'
|
|
# # so to do this, filter on the local authority code and property type, where we have U
|
|
# # values for the wall and take a median!
|
|
#
|
|
# p = input_properties[6]
|
|
# df = pd.DataFrame(data)
|
|
#
|
|
# res = []
|
|
# for p in input_properties:
|
|
# distances = []
|
|
# for borehole in tqdm(borehole_client.data, total=len(borehole_client.data)):
|
|
# dist_meeters, _ = borehole_client.distance_between_bng_coords(
|
|
# x1_bng=p.coordinates['x_coordinate'],
|
|
# y1_bng=p.coordinates['y_coordinate'],
|
|
# x2_bng=float(borehole['EASTING']),
|
|
# y2_bng=float(borehole['NORTHING'])
|
|
# )
|
|
# distances.append(dist_meeters)
|
|
#
|
|
# res.append(
|
|
# {
|
|
# "uprn": int(p.data["uprn"]),
|
|
# "meters_to_nearest_borehole": min(distances)
|
|
# }
|
|
#
|
|
# )
|
|
# res = pd.DataFrame(res)
|
|
#
|
|
# properties_dataset = [
|
|
# {
|
|
# **p.data,
|
|
# "in_conservation_area": p.in_conservation_area,
|
|
# **p.coordinates,
|
|
#
|
|
# } for p in input_properties
|
|
# ]
|
|
#
|
|
# properties_dataset = pd.DataFrame(properties_dataset)
|
|
# properties_dataset = properties_dataset.merge(res, on="uprn", how="left")
|
|
#
|
|
# properties_dataset.to_csv("properties_dataset.csv")
|
|
|
|
# We test estimating gain
|
|
import pandas as pd
|
|
pd.set_option('display.max_rows', 500)
|
|
pd.set_option('display.max_columns', 500)
|
|
pd.set_option('display.width', 1000)
|
|
df = pd.DataFrame(data)
|
|
|
|
# We want to estimate for making improvements on different property components
|
|
response = "environment-impact-current"
|
|
# We could potentially build models by constituency to avoid having too many
|
|
# features in the model
|
|
base_features = [
|
|
"property-type",
|
|
"built-form",
|
|
# "construction-age-band",
|
|
"number-habitable-rooms",
|
|
"constituency",
|
|
]
|
|
|
|
component_features = [
|
|
"walls-description",
|
|
"floor-description",
|
|
]
|
|
|
|
model_data = df[[response] + component_features + base_features]
|
|
model_data = model_data.reset_index(drop=True)
|
|
model_data["idx"] = model_data.index.copy()
|
|
|
|
# Append on u-value estimates
|
|
model_data = model_data.merge(
|
|
pd.DataFrame(cleaner.cleaned["walls-description"])[["original_description", "thermal_transmittance"]],
|
|
how="left",
|
|
left_on="walls-description",
|
|
right_on="original_description"
|
|
)
|
|
# Take just entries with U-values
|
|
model_data = model_data[~pd.isnull(model_data["thermal_transmittance"])]
|
|
model_data = model_data[base_features + ["thermal_transmittance", response]]
|
|
|
|
# We need to split the data into a train and test set for model build
|
|
categorical_cols = [
|
|
"property-type", "built-form", "number-habitable-rooms", "constituency",
|
|
]
|
|
|
|
# If these categorical variables are not of type 'category', convert them
|
|
for col in categorical_cols:
|
|
model_data[col] = model_data[col].astype('category')
|
|
|
|
# Dummy out the categorical variables
|
|
training_data = pd.get_dummies(model_data, columns=categorical_cols, drop_first=True)
|
|
|
|
# Convert booleans to integer
|
|
for col in training_data.columns:
|
|
if training_data[col].dtype == bool:
|
|
training_data[col] = training_data[col].astype(int)
|
|
|
|
if training_data[col].dtype == object:
|
|
training_data[col] = training_data[col].astype(float)
|
|
|
|
import statsmodels.api as sm
|
|
|
|
# Assuming 'df' is your DataFrame
|
|
X = training_data.drop(columns=response)
|
|
Y = training_data[response]
|
|
|
|
# Add a constant to the independent value
|
|
X1 = sm.add_constant(X)
|
|
|
|
# make regression model
|
|
model = sm.OLS(Y, X1)
|
|
|
|
# fit model and print results
|
|
results = model.fit()
|
|
print(results.summary())
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
def plot_regression(df):
|
|
# Extract the "fit" and "actual" columns from the dataframe
|
|
fit = df['fit']
|
|
actual = df['actual']
|
|
|
|
# Create an array of x-values (assumed to be sequential integers)
|
|
x = np.arange(len(df))
|
|
|
|
# Plot the fit and actual data
|
|
plt.plot(x, fit, color='red', label='Fit')
|
|
plt.plot(x, actual, color='blue', label='Actual')
|
|
|
|
# Set labels and title
|
|
plt.xlabel('Index')
|
|
plt.ylabel('Value')
|
|
plt.title('Linear Regression - Fit vs Actual')
|
|
|
|
# Display legend
|
|
plt.legend()
|
|
|
|
# Show the plot
|
|
plt.show()
|
|
|
|
import numpy as np
|
|
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
|
|
median_absolute_error
|
|
|
|
def calculate_regression_metrics(y_true, y_pred, n=20):
|
|
"""
|
|
Calculate the 5 most important accuracy metrics for regression.
|
|
|
|
Args:
|
|
y_true (array-like): Array of true target values.
|
|
y_pred (array-like): Array of predicted target values.
|
|
|
|
Returns:
|
|
dict: Dictionary containing the calculated metrics.
|
|
"""
|
|
metrics = {}
|
|
|
|
metrics['Mean Squared Error'] = mean_squared_error(y_true, y_pred)
|
|
metrics['Mean Absolute Error'] = mean_absolute_error(y_true, y_pred)
|
|
metrics['R2 Score'] = r2_score(y_true, y_pred)
|
|
metrics['Explained Variance Score'] = explained_variance_score(y_true, y_pred)
|
|
metrics['Median Absolute Error'] = median_absolute_error(y_true, y_pred)
|
|
|
|
errors = pd.DataFrame()
|
|
errors['Fit'] = y_true
|
|
errors['Actual'] = y_pred
|
|
errors['Residual'] = errors['Actual'] - errors['Fit']
|
|
errors['Absolute Residual'] = np.abs(errors['Residual'])
|
|
|
|
worst_errors = errors.nlargest(n, 'Absolute Residual')
|
|
|
|
return metrics, worst_errors
|
|
|
|
fit_error, worst_errors = calculate_regression_metrics(y_true=Y, y_pred=results.fittedvalues)
|
|
|
|
worst_x = model_data[model_data.index.isin(worst_errors.index)]
|
|
# The worst errors over index heavily for flats
|
|
|
|
fit_df = pd.DataFrame(
|
|
{
|
|
"fit": results.fittedvalues,
|
|
"actual": Y
|
|
}
|
|
)
|
|
|
|
# Sort on magnitude of actual
|
|
fit_df = fit_df.sort_values("actual", ascending=True)
|
|
plot_regression(fit_df)
|
|
|
|
model_data[["thermal_transmittance", response]].corr()
|
|
|
|
summary = model_data.groupby(["property-type", "built-form"], observed=True)[
|
|
["thermal_transmittance", response]
|
|
].corr()
|
|
|
|
summary = (
|
|
model_data
|
|
.groupby(component_features + base_features)
|
|
.agg({response: 'median', "idx": 'size'})
|
|
.reset_index()
|
|
)
|
|
|
|
summary = summary.sort_values("walls-description")
|
|
|
|
example = summary[
|
|
(summary["walls-description"].isin(
|
|
[
|
|
"Solid brick, as built, no insulation (assumed)",
|
|
"Solid brick, as built, partial insulation (assumed)",
|
|
"Solid brick, as built, insulated (assumed)",
|
|
]
|
|
)) &
|
|
(summary["property-type"] == "House") &
|
|
(summary["built-form"] == "Detached") &
|
|
# (summary["construction-age-band"] == "England and Wales: 1976-1982")
|
|
(summary["number-habitable-rooms"] == "4")
|
|
]
|