From 496ae8c969ea214981190b0b00536ccfc4827fc2 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 13 Jun 2024 02:29:41 +0100 Subject: [PATCH] Set up some different clustering approaches --- etl/customers/stonewater/shdf_3_clustering.py | 562 ++++++++++++++---- 1 file changed, 437 insertions(+), 125 deletions(-) diff --git a/etl/customers/stonewater/shdf_3_clustering.py b/etl/customers/stonewater/shdf_3_clustering.py index c7afa28d..c853fa94 100644 --- a/etl/customers/stonewater/shdf_3_clustering.py +++ b/etl/customers/stonewater/shdf_3_clustering.py @@ -1082,131 +1082,6 @@ def compile_data(): ) # We merge this spatial data onto final EPCS - spatial_data_to_uprn = spatial_data_to_uprn.drop( - columns=["partition", "filename"] - ).rename(columns={"UPRN": "uprn"}) - spatial_data_to_uprn["uprn"] = spatial_data_to_uprn["uprn"].astype(str) - - property_attributes = complete_epcs.merge( - spatial_data_to_uprn, - how="inner", - on="uprn" - ) - - # We drop the columns we don't care about for clustering - property_attributes = property_attributes.drop( - columns=[ - "address", - "uprn-source", - "heating-cost-potential", - "hot-water-cost-potential", - "potential-energy-rating", - "environment-impact-potential", - "address3", - "local-authority-label", - "sheating-energy-eff", - "local-authority-label", - "county", - "postcode", - "constituency", - "co2-emissions-potential", - "energy-consumption-potential", - "local-authority", - "inspection-date", - "address1", - "constituency-label", - "building-reference-number", - "floor-energy-eff", - "address2", - "posttown", - "floor-env-eff", - "sheating-env-eff", - "lighting-cost-potential", - "main-heating-controls", - "transaction-type", - "uprn", - "lodgement-date", - "lmk-key", - "wind-turbine-count", - "tenure", - "potential-energy-efficiency", - ] - ) - - # Fields to transform: lodgement-datetime - property_attributes["days_since_last_epc"] = ( - datetime.now() - pd.to_datetime(property_attributes["lodgement-datetime"]) - ).dt.days - - property_attributes = property_attributes.drop(columns=["lodgement-datetime"]) - - # Up to: - # Round averages to nearest integer - fill_with_average = [ - "low-energy-fixed-light-count", - "floor-height", - "heating-cost-current", - "fixed-lighting-outlets-count", - "hot-water-cost-current", - "number-heated-rooms", - "co2-emiss-curr-per-floor-area", - "total-floor-area", - "environment-impact-current", - "co2-emissions-current", - "number-habitable-rooms", - "energy-consumption-current", - 'lighting-cost-current', - "low_energy_lighting", - ] - - fill_with_mode = [ - "multi-glaze-proportion", - "extension-count", - ] - - fill_with_zero = [ - "unheated-corridor-length", - "number-open-fireplaces", - "glazed-area", - "photo-supply", - ] - - fill_with_categorical = { - "construction-age-band": "unknown", - "mainheat-energy-eff": "N/A", - "windows-env-eff": "N/A", - "lighting-energy-eff": "N/A", - "energy-tariff": 'NO DATA!', - "mechanical-ventilation": 'NO DATA!', - "solar-water-heating-flag": "N", - "mains-gas-flag": "N", - "heat-loss-corridor": "unknown", - "flat-storey-count": "Not a flat", - "roof-energy-eff": "N/A", - "hot-water-env-eff": "N/A", - "mainheatc-energy-eff": "N/A", - "main-fuel": 'NO DATA!', - "lighting-env-eff": "N/A", - "windows-energy-eff": "N/A", - "roof-env-eff": "N/A", - "walls-env-eff": "N/A", - "mainheat-env-eff": "N/A", - "flat-top-storey": "N", - "mainheatc-env-eff": "N", - "floor-level": "NODATA!", - "hot-water-energy-eff": "N/A", - } - - # Consolidation columns to single value - consolidation_columns = { - "glazed-type": {"from": ['', 'NO DATA!', 'not defined', 'INVALID!'], "to": "unknown"}, - "mechanical-ventilation": {"from": ['', 'NO DATA!', 'not defined', 'INVALID!'], "to": "unknown"}, - "solar-water-heating-flag": {"from": [''], "to": "N"}, - "mains-gas-flag": {"from": [''], "to": "N"}, - "heat-loss-corridor": {"from": ['NO DATA!', ''], "to": "N"}, - "flat-top-storey": {"from": [''], "to": "N"}, - "floor-level": {"from": [""], "to": "NODATA!"} - } def concatenate_row(row): @@ -1256,6 +1131,11 @@ def compile_data_final(): } ) uprn_lookup_2["match_type"] = "EPC" + uprn_lookup_2["uprn"] = np.where( + uprn_lookup_2["internal_id"] == 1091, + 83143766, + uprn_lookup_2["uprn"] + ) uprn_lookup_3 = pd.DataFrame(json.loads(read_from_s3( bucket_name="retrofit-data-dev", @@ -1319,6 +1199,12 @@ def compile_data_final(): ) epc_data = pd.DataFrame(epc_data) + epc_data["uprn"] = np.where( + epc_data["internal_id"] == 1091, + 83143766, + epc_data["uprn"] + ) + # We drop come EPCS epc_data = epc_data[epc_data["internal_id"].isin(uprn_lookup_2["internal_id"].values)] @@ -1510,6 +1396,432 @@ def compile_data_final(): if searcher.older_epcs is not None: older_epcs_batch_2[property["internal_id"]] = searcher.older_epcs + # Store in S3 + # TODO - read in instead of running + # save_data_to_s3( + # data=json.dumps(epc_data_batch_2), + # s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.json", + # bucket_name="retrofit-data-dev" + # ) + # + # save_data_to_s3( + # data=json.dumps(older_epcs_batch_2), + # s3_file_name="customers/Stonewater/clustering/older_epcs_batch_2.json", + # bucket_name="retrofit-data-dev" + # ) + + epc_data_batch_2 = pd.DataFrame(epc_data_batch_2) + complete_epcs = pd.concat([epc_data, epc_data_batch_2]) + + # We now prepare the final data for clustering + uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name="retrofit-data-dev", file_key="spatial/filename_meta.parquet" + ) + + uprn_map = {} + for uprn in complete_epcs["uprn"]: + filtered_df = uprn_filenames[ + (uprn_filenames["lower"] <= int(uprn)) + & (uprn_filenames["upper"] >= int(uprn)) + ] + if filtered_df["filenames"].values[0] in uprn_map: + uprn_map[filtered_df["filenames"].values[0]].append(int(uprn)) + else: + uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)] + + spatial_data_to_uprn = [] + for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)): + # Read in the file + spatial_data = read_dataframe_from_s3_parquet( + bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}" + ) + + spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)] + spatial_data_to_uprn.append(spatial_df) + + spatial_data_to_uprn = pd.concat(spatial_data_to_uprn) + + # TODO: Let's store this in s3 + # save_data_to_s3( + # data=json.dumps(spatial_data_to_uprn.to_dict("records")), + # s3_file_name="scustomers/Stonewater/clustering/spatial_data_to_uprn.json", + # bucket_name="retrofit-data-dev" + # ) + + spatial_data_to_uprn = spatial_data_to_uprn.drop( + columns=["partition", "filename"] + ).rename(columns={"UPRN": "uprn"}) + spatial_data_to_uprn["uprn"] = spatial_data_to_uprn["uprn"].astype(str) + + property_attributes = complete_epcs.merge( + spatial_data_to_uprn, + how="left", + on="uprn" + ) + + # We drop the columns we don't care about for clustering + property_attributes = property_attributes.drop( + columns=[ + "address", + "uprn-source", + "heating-cost-potential", + "hot-water-cost-potential", + "potential-energy-rating", + "environment-impact-potential", + "address3", + "local-authority-label", + "sheating-energy-eff", + "local-authority-label", + "county", + "postcode", + "constituency", + "co2-emissions-potential", + "energy-consumption-potential", + "local-authority", + "inspection-date", + "address1", + "constituency-label", + "building-reference-number", + "floor-energy-eff", + "address2", + "posttown", + "floor-env-eff", + "sheating-env-eff", + "lighting-cost-potential", + "main-heating-controls", + "transaction-type", + "uprn", + "lodgement-date", + "lmk-key", + "wind-turbine-count", + "tenure", + "potential-energy-efficiency", + "glazed-area" + ] + ) + + # Fields to transform: lodgement-datetime + property_attributes["days_since_last_epc"] = ( + datetime.now() - pd.to_datetime(property_attributes["lodgement-datetime"]) + ).dt.days + + property_attributes = property_attributes.drop(columns=["lodgement-datetime"]) + + # Up to: + # Round averages to nearest integer + fill_with_average = [ + "low-energy-fixed-light-count", + "floor-height", + "heating-cost-current", + "fixed-lighting-outlets-count", + "hot-water-cost-current", + "number-heated-rooms", + "co2-emiss-curr-per-floor-area", + "total-floor-area", + "environment-impact-current", + "co2-emissions-current", + "number-habitable-rooms", + "energy-consumption-current", + 'lighting-cost-current', + "low-energy-lighting", + ] + + fill_with_mode = [ + "multi-glaze-proportion", + "extension-count", + ] + + fill_with_zero = [ + "unheated-corridor-length", + "number-open-fireplaces", + "photo-supply", + ] + + fill_with_categorical = { + "construction-age-band": "unknown", + "mainheat-energy-eff": "N/A", + "windows-env-eff": "N/A", + "lighting-energy-eff": "N/A", + "energy-tariff": 'NO DATA!', + "mechanical-ventilation": 'NO DATA!', + "solar-water-heating-flag": "N", + "mains-gas-flag": "N", + "heat-loss-corridor": "unknown", + "flat-storey-count": "Not a flat", + "roof-energy-eff": "N/A", + "hot-water-env-eff": "N/A", + "mainheatc-energy-eff": "N/A", + "main-fuel": 'NO DATA!', + "lighting-env-eff": "N/A", + "windows-energy-eff": "N/A", + "roof-env-eff": "N/A", + "walls-env-eff": "N/A", + "mainheat-env-eff": "N/A", + "flat-top-storey": "N", + "mainheatc-env-eff": "N", + "floor-level": "NODATA!", + "hot-water-energy-eff": "N/A", + } + + # Consolidation columns to single value + consolidation_columns = { + "glazed-type": {"from": ['', 'NO DATA!', 'not defined', 'INVALID!'], "to": "unknown"}, + "mechanical-ventilation": {"from": ['', 'NO DATA!', 'not defined', 'INVALID!'], "to": "unknown"}, + "solar-water-heating-flag": {"from": [''], "to": "N"}, + "mains-gas-flag": {"from": [''], "to": "N"}, + "heat-loss-corridor": {"from": ['NO DATA!', ''], "to": "N"}, + "flat-top-storey": {"from": [''], "to": "N"}, + "floor-level": {"from": [""], "to": "NODATA!"} + } + + # Perform the cleaning + for col in fill_with_average: + property_attributes[col] = property_attributes[col].replace('', None) + avg_val = np.mean([float(x) for x in property_attributes[col].values if x not in [None, "", np.nan]]) + if pd.isnull(avg_val): + raise Exception("something went wrong") + property_attributes[col] = property_attributes[col].fillna(round(avg_val)) + property_attributes[col] = property_attributes[col].astype(float) + + for c in fill_with_zero: + property_attributes[c] = property_attributes[c].replace('', 0) + property_attributes[c] = property_attributes[c].fillna(0) + property_attributes[c] = property_attributes[c].astype(float) + + from scipy import stats + for col in fill_with_mode: + property_attributes[col] = property_attributes[col].replace('', None) + mode_val = stats.mode([float(x) for x in property_attributes[col].values if x not in [None, "", np.nan]])[0] + if pd.isnull(mode_val): + raise Exception("something went wrong") + property_attributes[col] = property_attributes[col].fillna(mode_val) + property_attributes[col] = property_attributes[col].astype(float) + + for c, fill_val in fill_with_categorical.items(): + property_attributes[c] = property_attributes[c].replace('', fill_val) + property_attributes[c] = property_attributes[c].fillna(fill_val) + + # Finally, consolidate + for c, consolidate_config in consolidation_columns.items(): + for v in consolidate_config["from"]: + property_attributes[c] = property_attributes[c].replace(v, consolidate_config["to"]) + + property_attributes["estimated"] = property_attributes["estimated"].fillna(False) + property_attributes["conservation_status"] = property_attributes["conservation_status"].fillna(False) + + # CLUSTERING!! + + # from sklearn.cluster import KMeans + # from sklearn.preprocessing import OneHotEncoder + # from scipy.spatial.distance import cdist + # + # property_attributes.set_index('internal_id', inplace=True) + # + # # Step 1: Prepare the data + # # Identify categorical columns (you might need to adjust this) + # categorical_cols = property_attributes.select_dtypes(include=['object', 'category']).columns.tolist() + # for col in categorical_cols: + # property_attributes[col] = property_attributes[col].astype(str) + # + # # Applying OneHotEncoder + # encoder = OneHotEncoder(sparse=False) + # encoded_cats = encoder.fit_transform(property_attributes[categorical_cols]) + # + # # Creating a new DataFrame with encoded categorical data and original numerical data + # numerical_data = property_attributes.select_dtypes(include=[np.number]) + # data_for_clustering = pd.concat([numerical_data, pd.DataFrame(encoded_cats, index=numerical_data.index)], axis=1) + # + # # Convert all column names to strings to satisfy KMeans requirements + # data_for_clustering.columns = data_for_clustering.columns.astype(str) + # + # # Step 2: K-Means Clustering + # k = 450 # number of clusters + # kmeans = KMeans(n_clusters=k, random_state=0) + # property_attributes['cluster'] = kmeans.fit_predict(data_for_clustering) + # + # # Extracting centroids + # centroids = kmeans.cluster_centers_ + # + # # Step 3: Assign clusters and rank rows + # # Calculating distances from each point to its cluster's centroid + # distances = cdist(data_for_clustering, centroids, 'euclidean') + # min_distances = distances.min(axis=1) + # property_attributes['distance_to_centroid'] = min_distances + # + # # Ranking rows by distance within each cluster + # property_attributes['rank'] = property_attributes.groupby('cluster')['distance_to_centroid'].rank(method='first') + # + # # Sorting to verify + # property_attributes.sort_values(by=['cluster', 'rank'], inplace=True) + # + # # Optional: Displaying the dataframe + # print(property_attributes.head()) + + from sklearn.cluster import KMeans + from sklearn.preprocessing import StandardScaler, OneHotEncoder + from sklearn.compose import ColumnTransformer + from sklearn.pipeline import Pipeline + from scipy.spatial.distance import cdist + id_column = 'internal_id' + property_attributes.set_index(id_column, inplace=True) + + # Define the preprocessing for numerical and categorical features + numerical_features = property_attributes.select_dtypes(include=['int64', 'float64']).columns.tolist() + categorical_features = property_attributes.select_dtypes(include=['object', 'category']).columns.tolist() + + for col in categorical_features: + property_attributes[col] = property_attributes[col].astype(str) + + preprocessor = ColumnTransformer( + transformers=[ + ('num', StandardScaler(), numerical_features), + ('cat', OneHotEncoder(), categorical_features) + ] + ) + + pipeline = Pipeline(steps=[('preprocessor', preprocessor), + ('kmeans', KMeans(n_clusters=10, random_state=0))]) + + # Fit the pipeline to the data + pipeline.fit(property_attributes) + + # Transform the data using the fitted pipeline + processed_data = pipeline.named_steps['preprocessor'].transform(property_attributes) + + # Get cluster labels + property_attributes['cluster'] = pipeline.named_steps['kmeans'].labels_ + + # Get centroids (already in the same transformed space) + centroids = pipeline.named_steps['kmeans'].cluster_centers_ + + processed_data = processed_data.toarray() + + # Calculate distances from each point to the centroid of its cluster + distances_to_centroids = [ + cdist(processed_data[i].reshape(1, -1), centroids[label].reshape(1, -1)).flatten()[0] + for i, label in enumerate(property_attributes['cluster']) + ] + + property_attributes['distance_to_centroid'] = distances_to_centroids + + for cluster_id in property_attributes['cluster'].unique(): + cluster_data = property_attributes[property_attributes['cluster'] == cluster_id] + min_distance = cluster_data['distance_to_centroid'].min() + print(f"Cluster {cluster_id} minimum distance to centroid: {min_distance}") + if min_distance != 0: + print(f"No point with zero distance found in cluster {cluster_id}") + + # Ranking rows by distance within each cluster + property_attributes['rank'] = property_attributes.groupby('cluster')['distance_to_centroid'].rank( + method='first') + + # Sorting to verify + property_attributes.sort_values(by=['cluster', 'rank'], inplace=True) + + ################################################ + # Agglomertive Clustering + ################################################ + + # from sklearn.cluster import KMeans, AgglomerativeClustering + # from sklearn.preprocessing import StandardScaler, OneHotEncoder + # from sklearn.compose import ColumnTransformer + # from sklearn.pipeline import Pipeline + # from scipy.spatial.distance import cdist + # import numpy as np + # from collections import Counter + # + # id_column = 'internal_id' + # property_attributes.set_index(id_column, inplace=True) + # + # # Define the preprocessing for numerical and categorical features + # numerical_features = property_attributes.select_dtypes(include=['int64', 'float64']).columns.tolist() + # categorical_features = property_attributes.select_dtypes(include=['object', 'category']).columns.tolist() + # + # for col in categorical_features: + # property_attributes[col] = property_attributes[col].astype(str) + # + # preprocessor = ColumnTransformer( + # transformers=[ + # ('num', StandardScaler(), numerical_features), + # ('cat', OneHotEncoder(sparse_output=False), categorical_features) + # ] + # ) + # + # # Function to perform clustering and merge small clusters + # def cluster_with_min_size(data, preprocessor, n_clusters=10, min_size=5): + # while True: + # # Preprocess the data + # processed_data = preprocessor.fit_transform(data) + # + # # Initial clustering + # clustering = AgglomerativeClustering(n_clusters=n_clusters) + # labels = clustering.fit_predict(processed_data) + # + # # Check cluster sizes + # cluster_counts = Counter(labels) + # + # # Find clusters smaller than min_size + # small_clusters = {cluster for cluster, count in cluster_counts.items() if count < min_size} + # + # if not small_clusters: + # break + # + # # Merge small clusters + # for cluster in small_clusters: + # # Find the nearest cluster to merge with + # cluster_data = processed_data[labels == cluster] + # other_clusters = [i for i in range(n_clusters) if i not in small_clusters] + # other_cluster_data = [processed_data[labels == i] for i in other_clusters] + # other_centroids = np.vstack([data.mean(axis=0) for data in other_cluster_data]) + # + # distances = cdist(cluster_data, other_centroids).mean(axis=0) + # closest_cluster = other_clusters[np.argmin(distances)] + # + # labels[labels == cluster] = closest_cluster + # + # n_clusters -= len(small_clusters) + # + # return labels + # + # # Perform clustering with minimum size constraint + # n_clusters = 10 + # min_size = 5 + # property_attributes['cluster'] = cluster_with_min_size(property_attributes, preprocessor, n_clusters, min_size) + # + # # Filter out empty clusters + # valid_clusters = property_attributes['cluster'].unique() + # + # # Get centroids for the resulting clusters + # processed_data = preprocessor.transform(property_attributes.drop(columns=["cluster"])) + # centroids = np.vstack([processed_data[property_attributes['cluster'] == i].mean(axis=0) for i in valid_clusters]) + # + # # Calculate distances from each point to the centroid of its cluster + # distances_to_centroids = [ + # cdist(processed_data[i].reshape(1, -1), + # centroids[valid_clusters.tolist().index(label)].reshape(1, -1)).flatten()[0] + # for i, label in enumerate(property_attributes['cluster']) + # ] + # + # property_attributes['distance_to_centroid'] = distances_to_centroids + # + # # Verify that at least one point in each cluster has zero distance to the centroid + # for cluster_id in valid_clusters: + # cluster_data = property_attributes[property_attributes['cluster'] == cluster_id] + # min_distance = cluster_data['distance_to_centroid'].min() + # print(f"Cluster {cluster_id} minimum distance to centroid: {min_distance}") + # if min_distance != 0: + # print(f"No point with zero distance found in cluster {cluster_id}") + # + # # Rank the distances within each cluster + # property_attributes['rank_within_cluster'] = property_attributes.groupby('cluster')['distance_to_centroid'] \ + # .rank(method='first') + # + # # Reset index to get 'internal_id' back + # property_attributes.reset_index(inplace=True) + # + # # Display the DataFrame + # print(property_attributes) + def pull_ideal_postcodes(missing_uprn_with_udprn): api_key = "" # Log into the platform the get the API key: https://account.ideal-postcodes.co.uk/