Model/model_data/app.py
2023-07-20 15:48:26 +01:00

369 lines
13 KiB
Python

from tqdm import tqdm
import os
from model_data.BoreholeClient import BoreholeClient
from model_data.LandRegistryClient import LandRegistryClient
from model_data.temp_inputs import input_data
from model_data.Property import Property
from model_data.config import EPC_AUTH_TOKEN
from epc_api.client import EpcClient
from model_data.downloader import pagenated_epc_download
from model_data.EpcClean import EpcClean
from open_uprn.OpenUprnClient import OpenUprnClient
from model_data.analysis.UvalueEstimations import UvalueEstimations
LAND_REGISTRY_PATHS = [
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv",
]
def handler():
# To begin with, the input data is a list of dictionaries, however we would read this file in
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
input_properties = [
Property(postcode=config['postcode'], address1=config['address1'], epc_client=epc_client)
for config in input_data
]
for p in input_properties:
p.search_address_epc()
p.set_year_built()
uprns = [p.data['uprn'] for p in input_properties]
open_uprn_client = OpenUprnClient(
path=os.path.abspath(
os.path.dirname(__file__)
) + "/model_data/local_data/osopenuprn_202306_csv/osopenuprn_202305.csv",
uprns=uprns
)
open_uprn_client.read()
# We're using Ordinance Survey Open Uprn data
# to find the coordinates of each address, which we will then be able to use at a later stage
for p in input_properties:
p.get_coordinates(open_uprn_client)
conservation_area_client = ConservationAreaClient(
historic_england_path=os.path.abspath(
os.path.dirname(__file__)
) + "/model_data/local_data/Historic_Eng_Conservation_Areas/Conservation_Areas.shp",
gov_path=os.path.abspath(
os.path.dirname(__file__)
) + "/model_data/local_data/gov-conservation-area.geojson"
)
conservation_area_client.read()
# Check if the property is in a conversation area
for p in input_properties:
in_conservation_area = conservation_area_client.is_in_conservation_area(p.coordinates)
p.set_is_in_conservation_area(in_conservation_area)
local_authorities = {p.data['local-authority'] for p in input_properties}
# TODO: Do this at a constituency level
constituencies = {p.data["constituency"] for p in input_properties}
property_types = ["bungalow", "flat", "house", "maisonette", "park home"]
# We pull properties from local authorities, by property type. This will allow us to build
# a dataset of up to 10k properties per local authority/property type combination
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
# and Wales from 31 July 2014
# Download data from August 2014 onwards
data = []
for c in tqdm(constituencies):
for pt in property_types:
data.extend(
pagenated_epc_download(
client=epc_client,
params={
"constituency": c,
"property-type": pt,
"from-month": 8,
"from-year": 2014,
},
page_size=5000,
n_pages=10,
)
)
# Incorporate input data into cleaning
cleaner = EpcClean(data + [p.data for p in input_properties])
cleaner.clean()
address_meta = [
{
"postcode": x["postcode"].upper(),
"address1": x["address1"].upper(),
"address2": x["address2"].upper(),
"address3": x["address3"].upper(),
"address": x["address"],
"uprn": x["uprn"]
} for x in data
]
import pickle
with open("sample_addresses.pkl", "wb") as f:
pickle.dump(address_meta, f)
# Land registry
land_registry_client = LandRegistryClient(
paths=LAND_REGISTRY_PATHS,
addresses=address_meta
)
lr_data = land_registry_client.read()
# Borehole
borehole_client = BoreholeClient(
path=os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/borehole/borehole.dbf"
)
borehole_client.read()
# Now, for our input properties, we need to identify the components of the building, based
# on the cleaning we've done
for p in input_properties:
p.get_components(cleaner)
# TODO: Add property age band into this
uvalue_estimates = UvalueEstimations(data=data)
uvalue_estimates.get_estimates(cleaner=cleaner)
# all_data = {
# "input_properties": input_properties,
# "cleaner": cleaner,
# "uvalue_estimates": uvalue_estimates,
# "land_registry_client": land_registry_client,
# "borehole_client": borehole_client,
# "conservation_area_client": conservation_area_client,
# "open_uprn_client": open_uprn_client,
# "data": data
# }
# import pickle
# with open("all_data.pkl", "wb") as f:
# pickle.dump(all_data, f)
# input_properties[4].data["address1"]
# input_properties[4].data["postcode"]
# floors_df["address1"].values[4]
# floors_df["original_description"].values[4]
#
# df = pd.DataFrame(
# [
# x.data for x in input_properties
# ]
# )
# df["property-type"].unique()
#
# from model_data.recommendations.WallRecommendations import WallRecommendations
# all_res = []
# for p in input_properties:
# inst = WallRecommendations(property_instance=p, uvalue_estimates=uvalue_estimates)
# inst.recommend()
# n_recs = len(inst.recommendations)
# all_res.append(n_recs)
#
# self = WallRecommendations(property_instance=input_properties[2], uvalue_estimates=uvalue_estimates)
# input_properties[6].walls
# self.recommend()
# df = pd.DataFrame(self.recommendations[0]["parts"])
# recommendations = pd.DataFrame(self.recommendations)
#
# from model_data.recommendations.FloorRecommendations import FloorRecommendations
# self = FloorRecommendations(property_instance=input_properties[4], uvalue_estimates=uvalue_estimates)
# self.recommendations
# self.recommend()
# self.recommendations
#
# # We need to deduce a U-value for "Good" energy effieciency
#
# mainheating = pd.DataFrame(
# [{"address1": p.address1, "postcode": p.postcode, **p.main_heating} for p in input_properties])
# hotwater = pd.DataFrame([{"address1": p.address1, **p.hotwater} for p in input_properties])
#
# mainheating[["address1", "postcode"]]
#
# # TODO: I want to knwo what "Good" efficiency means for the description
# # 'Flat 28, 22 Adelina Grove' 'Solid brick, as built, insulated (assumed)'
# # so to do this, filter on the local authority code and property type, where we have U
# # values for the wall and take a median!
#
# p = input_properties[6]
# df = pd.DataFrame(data)
#
# res = []
# for p in input_properties:
# distances = []
# for borehole in tqdm(borehole_client.data, total=len(borehole_client.data)):
# dist_meeters, _ = borehole_client.distance_between_bng_coords(
# x1_bng=p.coordinates['x_coordinate'],
# y1_bng=p.coordinates['y_coordinate'],
# x2_bng=float(borehole['EASTING']),
# y2_bng=float(borehole['NORTHING'])
# )
# distances.append(dist_meeters)
#
# res.append(
# {
# "uprn": int(p.data["uprn"]),
# "meters_to_nearest_borehole": min(distances)
# }
#
# )
# res = pd.DataFrame(res)
#
# properties_dataset = [
# {
# **p.data,
# "in_conservation_area": p.in_conservation_area,
# **p.coordinates,
#
# } for p in input_properties
# ]
#
# properties_dataset = pd.DataFrame(properties_dataset)
# properties_dataset = properties_dataset.merge(res, on="uprn", how="left")
#
# properties_dataset.to_csv("properties_dataset.csv")
# We test estimating gain
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
df = pd.DataFrame(data)
# We need to split the data into a train and test set for model build
# If these categorical variables are not of type 'category', convert them
print(results.summary())
grouped_error = []
groupby = ["mainheat-description"]
for group, data in model_data.groupby(groupby, observed=True):
group_fit_error, _ = calculate_regression_metrics(y_true=data[response].astype(float), y_pred=data["fit"])
# plot_regression(pd.DataFrame({"fit": data["fit"].values, "actual": data[response].astype(float).values}))
grouped_error.append(
{
**dict(zip(groupby, group)),
"n_samples": data.shape[0],
**group_fit_error,
}
)
grouped_error = pd.DataFrame(grouped_error)
grouped_error = grouped_error.sort_values("R2 Score", ascending=True)
plot_regression(fit_df)
model_data[["thermal_transmittance", response]].corr()
summary = model_data.groupby(["property-type", "built-form"], observed=True)[
["thermal_transmittance", response]
].corr()
summary = (
model_data
.groupby(component_features + base_features)
.agg({response: 'median', "idx": 'size'})
.reset_index()
)
summary = summary.sort_values("walls-description")
example = summary[
(summary["walls-description"].isin(
[
"Solid brick, as built, no insulation (assumed)",
"Solid brick, as built, partial insulation (assumed)",
"Solid brick, as built, insulated (assumed)",
]
)) &
(summary["property-type"] == "House") &
(summary["built-form"] == "Detached") &
# (summary["construction-age-band"] == "England and Wales: 1976-1982")
(summary["number-habitable-rooms"] == "4")
]
from textblob import TextBlob
converter = TextBlob("excelent lighting in this hosehold")
from model_data.utils import correct_spelling
result = correct_spelling("excelent lighting in this hosehold")
print(result)
'excellent lighting in this household'
def app():
"""
For a pre-defined list of constituencies and property types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
sanitise any description data
:return:
"""
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
constituencies = {'E14000555', 'E14000726', 'E14000720', 'E14000721', 'E14000553', 'E14000752'}
property_types = ["bungalow", "flat", "house", "maisonette", "park home"]
# We pull properties from local authorities, by property type. This will allow us to build
# a dataset of up to 10k properties per local authority/property type combination
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
# and Wales from 31 July 2014
# Download data from August 2014 onwards
data = []
for c in tqdm(constituencies):
for pt in property_types:
data.extend(
pagenated_epc_download(
client=epc_client,
params={
"constituency": c,
"property-type": pt,
"from-month": 8,
"from-year": 2014,
},
page_size=5000,
n_pages=10,
)
)
# Production of sample data for land registry
address_meta = [
{
"postcode": x["postcode"].upper(),
"address1": x["address1"].upper(),
"address2": x["address2"].upper(),
"address3": x["address3"].upper(),
"address": x["address"],
"uprn": x["uprn"]
} for x in data
]
import pickle
with open("sample_addresses.pkl", "wb") as f:
pickle.dump(address_meta, f)
# Incorporate input data into cleaning
cleaner = EpcClean(data)
cleaner.clean()
# TODO: cleaner.cleaned datasets to a db
# TODO: Add property age band into this
uvalue_estimates = UvalueEstimations(data=data)
uvalue_estimates.get_estimates(cleaner=cleaner)
# TODO: Store these to a db
uvalue_estimates.floors_decile_data