import pandas as pd from typing import List from etl.epc.Record import EPCDifferenceRecord from etl.epc.ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes from etl.epc_clean.epc_attributes.MainheatControlAttributes import ( MainheatControlAttributes, ) from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, get_wall_type, ) # TODO: Can probably produce this in the property change app and store in S3 BOOLEAN_VARIABLES = [ "is_cavity_wall", "is_filled_cavity", "is_solid_brick", "is_system_built", "is_timber_frame", "is_granite_or_whinstone", "is_as_built", "is_cob", "is_sandstone_or_limestone", "is_park_home", "external_insulation", "internal_insulation", "is_park_home_ending", "external_insulation_ending", "internal_insulation_ending", "is_to_unheated_space", "is_to_external_air", "is_suspended", "is_solid", "another_property_below", "is_pitched", "is_roof_room", "is_loft", "is_flat", "is_thatched", "is_at_rafters", "has_dwelling_above", "has_radiators", "has_fan_coil_units", "has_pipes_in_screed_above_insulation", "has_pipes_in_insulated_timber_floor", "has_pipes_in_concrete_slab", "has_boiler", "has_air_source_heat_pump", "has_room_heaters", "has_electric_storage_heaters", "has_warm_air", "has_electric_underfloor_heating", "has_electric_ceiling_heating", "has_community_scheme", "has_ground_source_heat_pump", "has_no_system_present", "has_portable_electric_heaters", "has_water_source_heat_pump", "has_electric_heat_pump", "has_micro-cogeneration", "has_solar_assisted_heat_pump", "has_exhaust_source_heat_pump", "has_community_heat_pump", "has_electric", "has_mains_gas", "has_wood_logs", "has_coal", "has_oil", "has_wood_pellets", "has_anthracite", "has_dual_fuel_mineral_and_wood", "has_smokeless_fuel", "has_lpg", "has_b30k", "has_electricaire", "has_assumed_for_most_rooms", "has_underfloor_heating", "has_radiators_ending", "has_fan_coil_units_ending", "has_pipes_in_screed_above_insulation_ending", "has_pipes_in_insulated_timber_floor_ending", "has_pipes_in_concrete_slab_ending", "has_boiler_ending", "has_air_source_heat_pump_ending", "has_room_heaters_ending", "has_electric_storage_heaters_ending", "has_warm_air_ending", "has_electric_underfloor_heating_ending", "has_electric_ceiling_heating_ending", "has_community_scheme_ending", "has_ground_source_heat_pump_ending", "has_no_system_present_ending", "has_portable_electric_heaters_ending", "has_water_source_heat_pump_ending", "has_electric_heat_pump_ending", "has_micro-cogeneration_ending", "has_solar_assisted_heat_pump_ending", "has_exhaust_source_heat_pump_ending", "has_community_heat_pump_ending", "has_electric_ending", "has_mains_gas_ending", "has_wood_logs_ending", "has_coal_ending", "has_oil_ending", "has_wood_pellets_ending", "has_anthracite_ending", "has_dual_fuel_mineral_and_wood_ending", "has_smokeless_fuel_ending", "has_lpg_ending", "has_b30k_ending", "has_electricaire_ending", "has_assumed_for_most_rooms_ending", "has_underfloor_heating_ending", "multiple_room_thermostats", "multiple_room_thermostats_ending", "is_community", "no_individual_heating_or_community_network", "is_community_ending", "no_individual_heating_or_community_network_ending", ] class BaseDataset: """ Base class for all datasets """ def __init__(self) -> None: self.pipeline_steps = {} def validate_dataset(self): """ Validate the dataset against the validation configuration """ self.dataset_validation: dict = DatasetValidationConfiguration # def pipeline_factory(self, pipeline_type: str) -> dict: # """ # Factory method for creating a pipeline # """ # if pipeline_type not in self.pipeline_steps: # raise ValueError(f"Pipeline type {pipeline_type} not found") # return self.pipeline_steps[pipeline_type] class TrainingDataset(BaseDataset): """ A collection of EPCDifferenceRecords can be combined into a TrainingDataset. """ def __init__( self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict ) -> None: # self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) self._feature_generation() # self._drop_features() self._clean_efficiency_variables() self._null_validation(information="Clean Efficiency Variables") self._expand_description_to_features(cleaned_lookup) self._adjust_assumed_values_in_wall_descriptions() self._generate_u_values_from_features() self._clean_missing_values() self._null_validation(information="Clean Missing Values") self._remove_abnormal_change_in_floor_area() self._ensure_numeric() self._organise_starting_ending_columns() def _organise_starting_ending_columns(self): """ Organise the starting and ending columns so that they are next to each other """ no_suffix_cols = [ col for col in self.df.columns if "_ending" not in col and "_starting" not in col ] starting_cols = [col for col in self.df.columns if "_starting" in col] ending_cols = [col for col in self.df.columns if "_ending" in col] common_cols = [ col.rsplit("_", 1)[0] for col in starting_cols if col.replace("_starting", "_ending") in ending_cols ] only_ending_cols = [ col for col in ending_cols if col.replace("_ending", "_starting") not in starting_cols ] common_cols = [[col + "_starting", col + "_ending"] for col in common_cols] self.df = self.df.loc[ :, no_suffix_cols + only_ending_cols + [col for cols in common_cols for col in cols], ] def _remove_abnormal_change_in_floor_area(self): """ Remove properties where the change in floor area is greater than 100% """ self.df["tfa_diff_abs"] = abs( self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"] ) self.df["tfa_diff_prop"] = ( self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"] ) self.df = self.df[self.df["tfa_diff_prop"] < 0.5] self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) def _ensure_numeric(self): """ Ensure that all columns are numeric """ # TODO: move into EPCRecord record uvalue_columns = [ col for col in self.df.columns if "thermal_transmittance" in col and "_unit" not in col ] for uvalue_col in uvalue_columns: self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col]) @staticmethod def _lambda_function_to_generate_roof_uvalue(row, is_end=False): """ Using the apply method, use the get_roof_u_value method to generate the u-value """ col_name = ( "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending" ) if row["has_dwelling_above"]: if row["roof_thermal_transmittance"] != 0: raise ValueError("Should have 0 u-value for roof") if row["roof_thermal_transmittance_ending"] != 0: raise ValueError("Should have 0 u-value for roof") return get_roof_u_value( insulation_thickness=row[col_name], has_dwelling_above=row["has_dwelling_above"], is_loft=row["is_loft"], is_roof_room=row["is_roof_room"], is_thatched=row["is_thatched"], is_flat=row["is_flat"], is_pitched=row["is_pitched"], is_at_rafters=row["is_at_rafters"], age_band=england_wales_age_band_lookup[row["construction_age_band"]], ) @staticmethod def _lambda_function_to_generate_wall_uvalue(row, is_end=False): """ Using the apply method, use the get_wall_u_value method to generate the u-value """ description_col_name = ( "walls_clean_description" if not is_end else "walls_clean_description_ending" ) thermal_transistance_col_name = ( "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending" ) if pd.isnull(row[thermal_transistance_col_name]): output = get_wall_u_value( clean_description=row[description_col_name], age_band=england_wales_age_band_lookup[row["construction_age_band"]], is_granite_or_whinstone=row["is_granite_or_whinstone"], is_sandstone_or_limestone=row["is_sandstone_or_limestone"], ) else: output = row[thermal_transistance_col_name] return output @staticmethod def _lambda_function_to_generate_floor_uvalue(row, is_end=False): """ Using the apply method, use the get_floor_u_value method to generate the u-value """ floor_thermal_col_name = ( "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending" ) if row["another_property_below"]: if row["floor_thermal_transmittance"] != 0: raise ValueError("Should have 0 u-value for floor") if row["floor_thermal_transmittance_ending"] != 0: raise ValueError("Should have 0 u-value for floor") return 0 else: uvalue = row[floor_thermal_col_name] if pd.isnull(uvalue): insulation_col_name = ( "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending" ) perimeter_col_name = ( "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending" ) floor_area_col_name = ( "ground_floor_area_starting" if not is_end else "ground_floor_area_ending" ) uvalue = get_floor_u_value( floor_type=row["floor_type"], perimeter=row[perimeter_col_name], area=row[floor_area_col_name], insulation_thickness=row[insulation_col_name], wall_type=row["wall_type"], age_band=england_wales_age_band_lookup[row["construction_age_band"]], ) return uvalue def _generate_u_values_from_features(self): """ Generate u-values from the features """ # ~~~~~~~~~~~~~~~~~~ # Walls # ~~~~~~~~~~~~~~~~~~ walls_starting_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_wall_uvalue(row), axis=1 ) walls_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True), axis=1, ) walls_starting_uvalue = self.df["walls_thermal_transmittance"].fillna( walls_starting_uvalue ) walls_starting_equals_ending_flag = ( self.df["walls_clean_description"] == self.df["walls_clean_description_ending"] ) walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[ walls_starting_equals_ending_flag ] # ~~~~~~~~~~~~~~~~~~ # Roof # ~~~~~~~~~~~~~~~~~~ roof_starting_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_roof_uvalue(row), axis=1 ) roof_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True), axis=1, ) roof_starting_uvalue = pd.to_numeric( self.df["roof_thermal_transmittance"], errors="coerce" ).fillna(roof_starting_uvalue) roof_ending_uvalue = pd.to_numeric( self.df["roof_thermal_transmittance_ending"], errors="coerce" ).fillna(roof_ending_uvalue) # ~~~~~~~~~~~~~~~~~~ # Floor # ~~~~~~~~~~~~~~~~~~ self.df["estimated_number_of_floors"] = self.df.apply( lambda row: estimate_number_of_floors(row["property_type"]), axis=1 ) self.df["ground_floor_area_starting"] = ( self.df["total_floor_area_starting"] / self.df["estimated_number_of_floors"] ) self.df["ground_floor_area_ending"] = ( self.df["total_floor_area_ending"] / self.df["estimated_number_of_floors"] ) self.df["estimated_perimeter_starting"] = self.df.apply( lambda row: estimate_perimeter( row["ground_floor_area_starting"], row["number_habitable_rooms_starting"] / row["estimated_number_of_floors"], ), axis=1, ) self.df["estimated_perimeter_ending"] = self.df.apply( lambda row: estimate_perimeter( row["ground_floor_area_starting"], row["number_habitable_rooms_ending"] / row["estimated_number_of_floors"], ), axis=1, ) self.df["floor_type"] = self.df["is_suspended"].replace( {True: "suspended", False: "solid"} ) self.df["wall_type"] = self.df.apply( lambda row: get_wall_type( is_cavity_wall=row["is_cavity_wall"], is_solid_brick=row["is_solid_brick"], is_timber_frame=row["is_timber_frame"], is_granite_or_whinstone=row["is_granite_or_whinstone"], is_cob=row["is_cob"], is_sandstone_or_limestone=row["is_sandstone_or_limestone"], is_system_built=row["is_system_built"], is_park_home=row["is_park_home"], ), axis=1, ) floor_starting_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_floor_uvalue(row), axis=1 ) floor_ending_uvalue = self.df.apply( lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True), axis=1 ) floor_starting_uvalue = pd.to_numeric( self.df["floor_thermal_transmittance"], errors="coerce" ).fillna(floor_starting_uvalue) floor_ending_uvalue = pd.to_numeric( self.df["floor_thermal_transmittance_ending"], errors="coerce" ).fillna(floor_ending_uvalue) for component in ["walls", "roof", "floor"]: self.df[f"{component}_thermal_transmittance"] = pd.to_numeric( self.df[f"{component}_thermal_transmittance"], errors="coerce" ).fillna(eval(f"{component}_starting_uvalue")) self.df[f"{component}_thermal_transmittance_ending"] = pd.to_numeric( self.df[f"{component}_thermal_transmittance_ending"], errors="coerce" ).fillna(eval(f"{component}_ending_uvalue")) self.df = self.df.drop( columns=[ "floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", "estimated_number_of_floors", "ground_floor_area_starting", "ground_floor_area_ending", ] ) def _adjust_assumed_values_in_wall_descriptions(self): """ Strip out assumed values for all wall descriptions """ for col in ["walls_clean_description", "walls_clean_description_ending"]: self.df[col] = ( self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip() ) def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str): """ Drop properties that have inconsistent data, i.e. changing material types """ starting_and_finishing_null = ( expanded_df["original_description"].isin([None, ""]) & expanded_df["original_description_ending"].isin([None, ""]) ) if component == "walls": expanded_df = expanded_df[ starting_and_finishing_null | ( (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) & (expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) & (expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) & (expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) & (expanded_df["is_cob"] == expanded_df["is_cob_ending"]) & (expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"]) ) ] elif component == "floor": expanded_df = expanded_df[ starting_and_finishing_null | ( (expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) & (expanded_df["is_solid"] == expanded_df["is_solid_ending"]) & (expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) & (expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) & (expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"]) ) ] elif component == "roof": expanded_df = expanded_df[ starting_and_finishing_null | ( (expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) & (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) & (expanded_df["is_loft"] == expanded_df["is_loft_ending"]) & (expanded_df["is_flat"] == expanded_df["is_flat_ending"]) & (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) & (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) & (expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"]) ) ] return expanded_df def _expand_description_to_features(self, cleaned_lookup: dict): """ This method will merge on the cleaned lookup table and ensure that the building fabric in the starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest possible dataset. # We look for key building fabric features that have changed from one EPC to the next. # if, for example, we see that a home has gone from being a cavity wall to a solid wall, we # remove this record, as it indicates that the quality of the EPC conducted in the first instance # is low # We also replace descriptions with their cleaned variants """ cols_to_drop = { "walls": [ # We need to cleaned descriptions for pulling out u-values "original_description", # "thermal_transmittance_unit", "original_description_ending", # "thermal_transmittance_unit_ending", "is_cavity_wall_ending", "is_solid_brick_ending", "is_system_built_ending", "is_timber_frame_ending", "is_granite_or_whinstone_ending", # "is_as_built_ending", "is_cob_ending", "is_sandstone_or_limestone_ending", # Re remove the is_assumed columns # "is_assumed", # "is_assumed_ending", ], "floor": [ "original_description", "clean_description", "thermal_transmittance_unit", "no_data", "no_data_ending", "original_description_ending", "clean_description_ending", "thermal_transmittance_unit_ending", "is_suspended_ending", "is_solid_ending", "another_property_below_ending", "is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed", "is_assumed_ending", ], "roof": [ "original_description", "clean_description", "thermal_transmittance_unit", "is_assumed", "is_valid", "original_description_ending", "clean_description_ending", "thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending", "is_loft_ending", "is_flat_ending", "is_thatched_ending", "has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending", ], "hotwater": [ "original_description", "clean_description", "assumed", "original_description_ending", "clean_description_ending", "assumed_ending", ], "mainheat": [ "original_description", "clean_description", "original_description_ending", "has_assumed", "original_description_ending", "clean_description_ending", "has_assumed_ending", ], "mainheatcont": [ "original_description", "clean_description", "original_description_ending", "clean_description_ending", ], "windows": [ "original_description", "clean_description", "original_description_ending", "clean_description_ending", # We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature "has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending", "no_data_ending", ], "main-fuel": [ "original_description", "clean_description", "original_description_ending", "clean_description_ending", ], } components_to_expand = cols_to_drop.keys() cleaning_lookup = { "walls": WallAttributes, "floor": FloorAttributes, "roof": RoofAttributes, "hotwater": HotWaterAttributes, "mainheat": MainHeatAttributes, "mainheatcont": MainheatControlAttributes, "windows": WindowAttributes, "main-fuel": MainFuelAttributes, } for component in components_to_expand: if component == "main-fuel": cleaned_key = "main-fuel" left_on_starting = "main_fuel_starting" left_on_ending = "main_fuel_ending" original_cols = ["main_fuel_starting", "main_fuel_ending"] else: cleaned_key = f"{component}-description" left_on_starting = f"{component}_description_starting" left_on_ending = f"{component}_description_ending" original_cols = [ f"{component}_description_starting", f"{component}_description_ending", ] cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) # We handle a specific edge case where we're missing information for the original description descriptions = [ x for x in self.df[left_on_starting].unique() if pd.notnull(x) ] # take any not in the cleaned lookup missing_descriptions = [ x for x in descriptions if x not in cleaned_lookup_df_for_key["original_description"].values ] if missing_descriptions: # We handle them here cleaner = cleaning_lookup[component] cleaned_data = [] for x in missing_descriptions: desc_cleaner = cleaner(x) cleaned = desc_cleaner.process() # IF NODATA, REMAP TO NONE VALUES, apart from walls which we want to keep as is # If we convert the walls data to None, we end up converting booleans to None which # causes issues downstream if all( (pd.DataFrame(cleaned, index=[0]).T)[0] == False ) and component != "walls": cleaned = {key: None for key in cleaned.keys()} cleaned_data.append( { "original_description": x, "clean_description": desc_cleaner.description.replace( "(assumed)", "" ) .rstrip() .capitalize(), **cleaned, } ) cleaned_lookup_df_for_key = pd.concat( [ cleaned_lookup_df_for_key, pd.DataFrame(cleaned_data), ], ignore_index=True, ) expanded_df = self.df.merge( cleaned_lookup_df_for_key, how="left", left_on=left_on_starting, right_on="original_description", ).merge( cleaned_lookup_df_for_key, how="left", left_on=left_on_ending, right_on="original_description", suffixes=("", "_ending"), ) # Drop properties where key material types have changed expanded_df = self._drop_inconsistent_properties(expanded_df, component) # Drop original cols and cols to drop expanded_df = expanded_df.drop( columns=cols_to_drop[component] + original_cols ) # Rename columns to component specific names, if they have not been dropped expanded_df = expanded_df.rename( columns={ "is_assumed": f"{component}_is_assumed", "is_assumed_ending": f"{component}_is_assumed_ending", "insulation_thickness": f"{component}_insulation_thickness", "insulation_thickness_ending": f"{component}_insulation_thickness_ending", "thermal_transmittance": f"{component}_thermal_transmittance", "thermal_transmittance_ending": f"{component}_thermal_transmittance_ending", "thermal_transmittance_unit": f"{component}_thermal_transmittance_unit", "thermal_transmittance_unit_ending": f"{component}_thermal_transmittance_unit_ending", "tariff_type": f"{component}_tariff_type", "tariff_type_ending": f"{component}_tariff_type_ending", "clean_description": f"{component}_clean_description", "clean_description_ending": f"{component}_clean_description_ending", } ) self.df = expanded_df # We don't need any lighting specific cleaning, we just drop the original description as we use # LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING self.df = self.df.drop( columns=["lighting_description_starting", "lighting_description_ending"] ) def _clean_missing_values(self, ignore_cols=None): missings = pd.isnull(self.df).sum() missings = missings[missings > 0] if ignore_cols: missings = missings[~missings.index.isin(ignore_cols)] for col in missings.index: unique_values = self.df[col].unique() if ( (True in unique_values) or (False in unique_values) or (col in BOOLEAN_VARIABLES) ): self.df[col] = self.df[col].fillna(False) if "none" in unique_values: self.df[col] = self.df[col].fillna("none") else: self.df[col] = self.df[col].fillna("Unknown") def _null_validation(self, information: str): # print(f"Null validation after {information}") if pd.isnull(self.df).sum().sum(): raise ValueError(f"Null values found in dataset, after step {information}") def _drop_features(self): """ Drop features that are not needed for modelling """ self.df = self.df.drop( columns=["lodgement_date_starting", "lodgement_date_ending"] ) def _feature_generation(self): """ Generate features for modelling """ self.df["days_to_starting"] = self._calculate_days_to( self.df["lodgement_date_starting"] ) self.df["days_to_ending"] = self._calculate_days_to( self.df["lodgement_date_ending"] ) def _clean_efficiency_variables(self): """ These is scope to clean this by the model per corresponding description. E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and fill in the missing values with this. When looking at this initially, there are a large volume of records with missing energy efficiency values and therefore a simpler approach was taken just to test including these variables :param df: :return: """ missings = pd.isnull(self.df).sum() missings = missings[missings >= 1] if len(missings) == 0: return # # Make sure they are all efficiency columns if any(~missings.index.str.contains("energy_eff")): raise ValueError(f"Non efficiency columns are missing {missings.index}") for m in missings.index: self.df[m] = self.df[m].fillna("NO_RATING") @staticmethod def _calculate_days_to(lodgement_date): if isinstance(lodgement_date, str): return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).days return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).dt.days # def __add__(self, other) -> "TrainingDataset": # if not isinstance(other, TrainingDataset): # raise TypeError("Addition can only be performed with another instance of TrainingDataset") # return TrainingDataset(self.datasets + other.datasets) # def __radd__(self, other): # """ # Required for sum() to work # """ # if isinstance(other, int): # return self # else: # return self.__add__(other) class NewDataset(BaseDataset): """ A collection of EPCDifferenceRecords can be combined into a ScoringDataset. """ def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: # self.pipeline_steps = self.pipeline_factory("newdata") self.datasets = datasets def __add__(self, other) -> "NewDataset": if not isinstance(other, NewDataset): raise TypeError( "Addition can only be performed with another instance of ScoringDataset" ) return NewDataset(self.datasets + other.datasets) def __radd__(self, other): """ Required for sum() to work """ if isinstance(other, int): return self else: return self.__add__(other)