import pandas as pd from tqdm import tqdm import os from model_data.BoreholeClient import BoreholeClient from model_data.LandRegistryClient import LandRegistryClient from model_data.ConservationAreaClient import ConservationAreaClient from model_data.temp_inputs import input_data from model_data.Property import Property from model_data.config import EPC_AUTH_TOKEN from epc_api.client import EpcClient from model_data.downloader import pagenated_epc_download from model_data.EpcClean import EpcClean from model_data.OpenUprnClient import OpenUprnClient from model_data.analysis.UvalueEstimations import UvalueEstimations LAND_REGISTRY_PATHS = [ os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv", os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv", ] def handler(): # To begin with, the input data is a list of dictionaries, however we would read this file in epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN) input_properties = [ Property(postcode=config['postcode'], address1=config['address1'], epc_client=epc_client) for config in input_data ] for p in input_properties: p.search_address_epc() p.set_year_built() uprns = [p.data['uprn'] for p in input_properties] open_uprn_client = OpenUprnClient( path=os.path.abspath( os.path.dirname(__file__) ) + "/model_data/local_data/osopenuprn_202306_csv/osopenuprn_202305.csv", uprns=uprns ) open_uprn_client.read() # We're using Ordinance Survey Open Uprn data # to find the coordinates of each address, which we will then be able to use at a later stage for p in input_properties: p.get_coordinates(open_uprn_client) conservation_area_client = ConservationAreaClient( historic_england_path=os.path.abspath( os.path.dirname(__file__) ) + "/model_data/local_data/Historic_Eng_Conservation_Areas/Conservation_Areas.shp", gov_path=os.path.abspath( os.path.dirname(__file__) ) + "/model_data/local_data/gov-conservation-area.geojson" ) conservation_area_client.read() # Check if the property is in a conversation area for p in input_properties: p.set_is_in_conservation_area(conservation_area_client) local_authorities = {p.data['local-authority'] for p in input_properties} # TODO: Do this at a constituency level constituencies = {p.data["constituency"] for p in input_properties} property_types = ["bungalow", "flat", "house", "maisonette", "park home"] # We pull properties from local authorities, by property type. This will allow us to build # a dataset of up to 10k properties per local authority/property type combination # For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were # conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England # and Wales from 31 July 2014 # Download data from August 2014 onwards data = [] for c in tqdm(constituencies): for pt in property_types: data.extend( pagenated_epc_download( client=epc_client, params={ "constituency": c, "property-type": pt, "from-month": 8, "from-year": 2014, }, page_size=5000, n_pages=10, ) ) # Incorporate input data into cleaning cleaner = EpcClean(data + [p.data for p in input_properties]) cleaner.clean() address_meta = [ { "postcode": x["postcode"].upper(), "address1": x["address1"].upper(), "address2": x["address2"].upper(), "address3": x["address3"].upper(), "address": x["address"], "uprn": x["uprn"] } for x in data ] # Land registry land_registry_client = LandRegistryClient( paths=LAND_REGISTRY_PATHS, addresses=address_meta ) lr_data = land_registry_client.read() # Borehole borehole_client = BoreholeClient( path=os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/borehole/borehole.dbf" ) borehole_client.read() # Now, for our input properties, we need to identify the components of the building, based # on the cleaning we've done for p in input_properties: p.get_components(cleaner) # TODO: Add property age band into this uvalue_estimates = UvalueEstimations(data=data) uvalue_estimates.get_estimates(cleaner=cleaner) # all_data = { # "input_properties": input_properties, # "cleaner": cleaner, # "uvalue_estimates": uvalue_estimates, # "land_registry_client": land_registry_client, # "borehole_client": borehole_client, # "conservation_area_client": conservation_area_client, # "open_uprn_client": open_uprn_client, # "data": data # } # import pickle # with open("all_data.pkl", "wb") as f: # pickle.dump(all_data, f) # input_properties[4].data["address1"] # input_properties[4].data["postcode"] # floors_df["address1"].values[4] # floors_df["original_description"].values[4] # # df = pd.DataFrame( # [ # x.data for x in input_properties # ] # ) # df["property-type"].unique() # # from model_data.recommendations.WallRecommendations import WallRecommendations # all_res = [] # for p in input_properties: # inst = WallRecommendations(property_instance=p, uvalue_estimates=uvalue_estimates) # inst.recommend() # n_recs = len(inst.recommendations) # all_res.append(n_recs) # # self = WallRecommendations(property_instance=input_properties[2], uvalue_estimates=uvalue_estimates) # input_properties[6].walls # self.recommend() # df = pd.DataFrame(self.recommendations[0]["parts"]) # recommendations = pd.DataFrame(self.recommendations) # # from model_data.recommendations.FloorRecommendations import FloorRecommendations # self = FloorRecommendations(property_instance=input_properties[4], uvalue_estimates=uvalue_estimates) # self.recommendations # self.recommend() # self.recommendations # # # We need to deduce a U-value for "Good" energy effieciency # # mainheating = pd.DataFrame( # [{"address1": p.address1, "postcode": p.postcode, **p.main_heating} for p in input_properties]) # hotwater = pd.DataFrame([{"address1": p.address1, **p.hotwater} for p in input_properties]) # # mainheating[["address1", "postcode"]] # # # TODO: I want to knwo what "Good" efficiency means for the description # # 'Flat 28, 22 Adelina Grove' 'Solid brick, as built, insulated (assumed)' # # so to do this, filter on the local authority code and property type, where we have U # # values for the wall and take a median! # # p = input_properties[6] # df = pd.DataFrame(data) # # res = [] # for p in input_properties: # distances = [] # for borehole in tqdm(borehole_client.data, total=len(borehole_client.data)): # dist_meeters, _ = borehole_client.distance_between_bng_coords( # x1_bng=p.coordinates['x_coordinate'], # y1_bng=p.coordinates['y_coordinate'], # x2_bng=float(borehole['EASTING']), # y2_bng=float(borehole['NORTHING']) # ) # distances.append(dist_meeters) # # res.append( # { # "uprn": int(p.data["uprn"]), # "meters_to_nearest_borehole": min(distances) # } # # ) # res = pd.DataFrame(res) # # properties_dataset = [ # { # **p.data, # "in_conservation_area": p.in_conservation_area, # **p.coordinates, # # } for p in input_properties # ] # # properties_dataset = pd.DataFrame(properties_dataset) # properties_dataset = properties_dataset.merge(res, on="uprn", how="left") # # properties_dataset.to_csv("properties_dataset.csv") # We test estimating gain import pandas as pd pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1000) df = pd.DataFrame(data) # We want to estimate for making improvements on different property components response = "environment-impact-current" # We could potentially build models by constituency to avoid having too many # features in the model base_features = [ "property-type", "built-form", # "construction-age-band", "number-habitable-rooms", "constituency", "number-heated-rooms", ] component_features = [ "walls-description", "floor-description", "lighting-description", "windows-description", "roof-description", "mainheat-description", "main-fuel" ] model_data = df[[response] + component_features + base_features] model_data = model_data.reset_index(drop=True) model_data["idx"] = model_data.index.copy() # Append on u-value estimates model_data = model_data.merge( pd.DataFrame(cleaner.cleaned["walls-description"])[["original_description", "thermal_transmittance"]].rename( columns={"thermal_transmittance": "walls_u_value", } ), how="left", left_on="walls-description", right_on="original_description" ) \ .drop(columns=["original_description"]) \ .merge( pd.DataFrame(cleaner.cleaned["floor-description"])[["original_description", "thermal_transmittance"]].rename( columns={"thermal_transmittance": "floor_u_value", } ), how="left", left_on="floor-description", right_on="original_description" ) # Take just entries with U-values model_data = model_data[ ~pd.isnull(model_data["walls_u_value"]) & ~pd.isnull(model_data["floor_u_value"]) ] model_data = model_data[ base_features + [c for c in component_features if c not in [ "walls-description", "floor-description"]] + ["walls_u_value", "floor_u_value", response] ] # We need to split the data into a train and test set for model build categorical_cols = [ "property-type", "built-form", "number-habitable-rooms", "constituency", "number-heated-rooms", "lighting-description", "windows-description", "roof-description", "mainheat-description", "main-fuel", ] # If these categorical variables are not of type 'category', convert them for col in categorical_cols: model_data[col] = model_data[col].astype('category') # Dummy out the categorical variables training_data = pd.get_dummies(model_data, columns=categorical_cols, drop_first=True) # Convert booleans to integer for col in training_data.columns: if training_data[col].dtype == bool: training_data[col] = training_data[col].astype(int) if training_data[col].dtype == object: training_data[col] = training_data[col].astype(float) import statsmodels.api as sm # Assuming 'df' is your DataFrame X = training_data.drop(columns=response) Y = training_data[response] # Add a constant to the independent value X1 = sm.add_constant(X) # make regression model model = sm.OLS(Y, X1) # fit model and print results results = model.fit() print(results.summary()) import matplotlib.pyplot as plt import numpy as np def plot_regression(df): # Extract the "fit" and "actual" columns from the dataframe fit = df['fit'] actual = df['actual'] # Create an array of x-values (assumed to be sequential integers) x = np.arange(len(df)) # Plot the fit and actual data plt.plot(x, fit, color='red', label='Fit') plt.plot(x, actual, color='blue', label='Actual') # Set labels and title plt.xlabel('Index') plt.ylabel('Value') plt.title('Linear Regression - Fit vs Actual') # Display legend plt.legend() # Show the plot plt.show() import numpy as np from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \ median_absolute_error, mean_absolute_percentage_error def calculate_regression_metrics(y_true, y_pred, n=20): """ Calculate the 5 most important accuracy metrics for regression. Args: y_true (array-like): Array of true target values. y_pred (array-like): Array of predicted target values. Returns: dict: Dictionary containing the calculated metrics. """ metrics = {} metrics['MAPE'] = mean_absolute_percentage_error(y_true, y_pred) metrics['Mean Squared Error'] = mean_squared_error(y_true, y_pred) metrics['Mean Absolute Error'] = mean_absolute_error(y_true, y_pred) metrics['R2 Score'] = r2_score(y_true, y_pred) metrics['Explained Variance Score'] = explained_variance_score(y_true, y_pred) metrics['Median Absolute Error'] = median_absolute_error(y_true, y_pred) metrics['Mean True Value'] = y_true.mean() metrics['Mean Predicted Value'] = y_pred.mean() errors = pd.DataFrame() errors['Fit'] = y_true errors['Actual'] = y_pred errors['Residual'] = errors['Actual'] - errors['Fit'] errors['Absolute Residual'] = np.abs(errors['Residual']) worst_errors = errors.nlargest(n, 'Absolute Residual') return metrics, worst_errors fit_error, worst_errors = calculate_regression_metrics(y_true=Y, y_pred=results.fittedvalues) model_data['fit'] = results.fittedvalues # The worst errors over index heavily for flats worst_x = model_data[model_data.index.isin(worst_errors.index)] # Notes # TODO: We might want to look at adding in the u-value estimates for the properties that do not have them # so that we have move data. # TODO: Add in the u-values for roofs rather than the description # TODO: Add in the actual property features for walls, floors, roof, not just the u-value # TODO: Think about how we use sap vs rdsap - should we add a feature in the model for transaction-type? # TODO: Remove cases where descriptions have no data or are error cases # # property type looks okay - we're definitely low on the number of bungalows # number-habitable-rooms & number-heated-rooms is unpopulated so pretty useless atm # **** constituency should be looked at - potentially modelled individually as some constituencies # peform much worse that others despite enough data. # **** Lighting is a bit of mess - needs to be looked at. Most properties are of the same type # and a few of the categories just have barely any data and poor scores # **** windows-description again most of the properties are of the same type, need more samples # for thge smaller groups # **** Turn roof into U-value # **** mainheat is a bad one - community scheme seems to actually be quite a lot of properties, it's ok for # MAPE though. grouped_error = [] groupby = ["mainheat-description"] for group, data in model_data.groupby(groupby, observed=True): group_fit_error, _ = calculate_regression_metrics(y_true=data[response].astype(float), y_pred=data["fit"]) # plot_regression(pd.DataFrame({"fit": data["fit"].values, "actual": data[response].astype(float).values})) grouped_error.append( { **dict(zip(groupby, group)), "n_samples": data.shape[0], **group_fit_error, } ) grouped_error = pd.DataFrame(grouped_error) grouped_error = grouped_error.sort_values("R2 Score", ascending=True) fit_df = pd.DataFrame( { "fit": results.fittedvalues, "actual": Y } ) # Sort on magnitude of actual fit_df = fit_df.sort_values("actual", ascending=True) plot_regression(fit_df) model_data[["thermal_transmittance", response]].corr() summary = model_data.groupby(["property-type", "built-form"], observed=True)[ ["thermal_transmittance", response] ].corr() summary = ( model_data .groupby(component_features + base_features) .agg({response: 'median', "idx": 'size'}) .reset_index() ) summary = summary.sort_values("walls-description") example = summary[ (summary["walls-description"].isin( [ "Solid brick, as built, no insulation (assumed)", "Solid brick, as built, partial insulation (assumed)", "Solid brick, as built, insulated (assumed)", ] )) & (summary["property-type"] == "House") & (summary["built-form"] == "Detached") & # (summary["construction-age-band"] == "England and Wales: 1976-1982") (summary["number-habitable-rooms"] == "4") ]