From 2b7ca82d09aea93737d5c93cb0619c55aba71063 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 10 Dec 2024 18:55:30 +0000 Subject: [PATCH] creating checking code for Stonewater --- .../stonewater/Wave 3 Preparation.py | 76 +++++++++++++------ 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index bd36d782..4e336f23 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -2699,28 +2699,6 @@ def identify_incorrect_pacakges(): # The next check is to identify properties with specific features that are not condusive to specific packages. E.g. # Solar PV packages for properties that have another dwelling above - - z = units_with_assigned_packages[ - units_with_assigned_packages["Package Ref"].isin( - [ - "3A", "3B", "4", 4 - ] - ) - ] - z["Roof Type"].value_counts() - z["Survey: Main Roof Type"].value_counts() - - z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")][ - "Survey: Matching Address ID"].value_counts() - - zz = z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")][ - ["Survey: Matching Address ID", "Survey: Org. ref.", "Survey: Main Roof Type"] - ].drop_duplicates() - zz = zz.sort_values("Survey: Matching Address ID") - zz.to_csv(os.path.join(CUSTOMER_FOLDER_PATH, "3A, 3B or 4 Packages with a dwelling above.csv"), index=False) - - z[z["Survey: Main Roof Type"].str.contains("A Another dwelling above")]["Package Ref"].value_counts() - # Label properties that have been matched to a package, during coordination, that includes Solar PV and has # a property with a dwelling above units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = ( @@ -2731,6 +2709,60 @@ def identify_incorrect_pacakges(): # Label properties that have a dwelling above in the Parity data, and weren't surveyed, but have been assigned # a package that includes solar PV + units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = ( + (units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & ( + units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above") + ) + ) + + # We now iterate through postcodes and find anomalous properties based on the partiy data and survey data + fields_to_check = [ + 'Wall Type', 'Roof Type', 'Heating', 'Main Fuel', + 'Survey: Main Wall Type', + 'Survey: Main Roof Type', 'Survey: Primary Heating System' + ] + # Create an empty dictionary to store results + aggregated_results = {} + + units_with_assigned_packages['Wall Type'] = units_with_assigned_packages['Wall Type'].str.replace( + r'\s*\(.*?\)', '', regex=True + ) + + def check_mixed_types(row): + # Count distinct primary types with non-zero values + primary_types_present = set() + for col in field_counts.columns: + if ':' in col: + primary_type = col.split(':')[0] + if row[col] > 0: # Non-zero count means this type is present + primary_types_present.add(primary_type) + return len(primary_types_present) > 1 # True if more than one primary type + + # Process each field + for field in fields_to_check: + # Group by postcode and count occurrences of each unique value + field_counts = ( + units_with_assigned_packages.groupby(['Postcode', field]) + .size() + .unstack(fill_value=0) + .reset_index() + ) + + # Calculate dominant value and percentage before modifying the DataFrame + dominant_value = field_counts.iloc[:, 1:].idxmax(axis=1) + dominant_percentage = ( + (field_counts.iloc[:, 1:].max(axis=1) / field_counts.iloc[:, 1:].sum(axis=1)) * 100 + ) + number_of_properties = field_counts.iloc[:, 1:].sum(axis=1) + + # Add these as new columns after computation + field_counts['Dominant Value'] = dominant_value + field_counts['% Dominant'] = dominant_percentage + field_counts['Number of Properties'] = number_of_properties + field_counts['Mixed Type'] = field_counts.apply(check_mixed_types, axis=1) + + # Store the result in the dictionary + aggregated_results[field] = field_counts # if __name__ == "__main__": # main()