diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 5bb54088..1946c1a4 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -62,17 +62,12 @@ def is_int(x): return False -class DataProcessor: +class EPCDataProcessor: """ Handle data loading and data preprocessing """ training_pipeline = { - "load_data": { - "function": "load_data", - "args": [], - "kwargs": {"low_memory": DATA_PROCESSOR_SETTINGS["low_memory"]}, - }, "confine_data": { "function": "confine_data", "args": [], @@ -174,8 +169,8 @@ class DataProcessor: In this instance, there are some operations we do not want to perform, such as confine_data() """ - self.data : pd.DataFrame = None - self.cleaning_averages : pd.DataFrame = None + self.data : pd.DataFrame + self.cleaning_averages : pd.DataFrame self.filepath = filepath self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training") @@ -183,22 +178,6 @@ class DataProcessor: self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) - def pre_process_pipeline(self) -> None: - """ - For the pipeline_steps, we apply each function in turn - """ - - for step in self.pipeline_steps: - step_function = getattr(self, self.pipeline_steps[step]["function"]) - step_args = self.pipeline_steps[step]["args"] - step_kwargs = self.pipeline_steps[step]["kwargs"] - - if step_args: - step_function(*step_args, **step_kwargs) - else: - step_function(**step_kwargs) - - def pipeline_factory(self, pipeline_type: str) -> dict: """ Determine which dataclient to use @@ -213,6 +192,23 @@ class DataProcessor: raise ValueError("Pipeline type specified is not in factory") return pipelines[pipeline_type] + + + def pre_process_pipeline(self) -> None: + """ + For the pipeline_steps, we apply each function in turn + This will alter self.data inplace + """ + + for step in self.pipeline_steps: + step_function = getattr(self, self.pipeline_steps[step]["function"]) + step_args = self.pipeline_steps[step]["args"] + step_kwargs = self.pipeline_steps[step]["kwargs"] + + if step_args: + step_function(*step_args, **step_kwargs) + else: + step_function(**step_kwargs) def load_data(self, low_memory=False) -> None: @@ -305,7 +301,7 @@ class DataProcessor: break to_index -= 1 - def pre_process(self) -> pd.DataFrame: + def pre_process(self): """ Load data and begin initial cleaning """ @@ -346,7 +342,7 @@ class DataProcessor: # Final re-casting after data transformed and prepared coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES for k, v in coltypes.items(): - self.data[k] = self.data[k].astype(v) + self.data[k] = self.data[k].astype(v) self.data = self.data.astype(coltypes) self.na_remapping() @@ -365,6 +361,8 @@ class DataProcessor: self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] + self.data.columns = self.data.columns.str.lower() + def na_remapping(self): diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 3c38eaa6..e364d0f0 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -1,6 +1,6 @@ import pandas as pd from typing import List -from etl.epc.EPCRecord import EPCDifferenceRecord +from etl.epc.Record import EPCDifferenceRecord from ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE @@ -19,14 +19,14 @@ class BaseDataset: """ self.dataset_validation: dict = DatasetValidationConfiguration - def pipeline_factory(self, pipeline_type: str) -> dict: - """ - Factory method for creating a pipeline - """ - if pipeline_type not in self.pipeline_steps: - raise ValueError(f"Pipeline type {pipeline_type} not found") + # def pipeline_factory(self, pipeline_type: str) -> dict: + # """ + # Factory method for creating a pipeline + # """ + # if pipeline_type not in self.pipeline_steps: + # raise ValueError(f"Pipeline type {pipeline_type} not found") - return self.pipeline_steps[pipeline_type] + # return self.pipeline_steps[pipeline_type] class TrainingDataset(BaseDataset): """ @@ -34,7 +34,7 @@ class TrainingDataset(BaseDataset): """ def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: - self.pipeline_steps = self.pipeline_factory("training") + # self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) @@ -42,11 +42,11 @@ class TrainingDataset(BaseDataset): self._feature_generation() self._drop_features() # self._clean_dataframe() - self._clean_efficiency_variables() - self._null_validation(information="Clean Efficiency Variables") - # self._process_and_prune() - self._clean_missing_values() - self._null_validation(information="Clean Missing Values") + # self._clean_efficiency_variables() + # self._null_validation(information="Clean Efficiency Variables") + # # self._process_and_prune() + # self._clean_missing_values() + # self._null_validation(information="Clean Missing Values") def _clean_missing_values(self, ignore_cols=None): diff --git a/etl/epc/EPCRecord.py b/etl/epc/Record.py similarity index 78% rename from etl/epc/EPCRecord.py rename to etl/epc/Record.py index a5079f39..4f136839 100644 --- a/etl/epc/EPCRecord.py +++ b/etl/epc/Record.py @@ -14,6 +14,13 @@ from etl.epc.settings import ( EFFICIENCY_FEATURES ) +# TODO: Change these in the settings file +RDSAP_RESPONSE = RDSAP_RESPONSE.lower() +HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() +CARBON_RESPONSE = CARBON_RESPONSE.lower() +COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] +EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] + @dataclass class EPCRecord: """ @@ -24,47 +31,46 @@ class EPCRecord: floor = None roof = None - - UPRN: str - WALLS_DESCRIPTION: str - FLOOR_DESCRIPTION: str - LIGHTING_DESCRIPTION: str - ROOF_DESCRIPTION: str - MAINHEAT_DESCRIPTION: str - HOTWATER_DESCRIPTION: str - MAIN_FUEL: str - MECHANICAL_VENTILATION: str - SECONDHEAT_DESCRIPTION: str - WINDOWS_DESCRIPTION: str - GLAZED_TYPE: str - MULTI_GLAZE_PROPORTION: float - LOW_ENERGY_LIGHTING: float - NUMBER_OPEN_FIREPLACES: float - MAINHEATCONT_DESCRIPTION: str - SOLAR_WATER_HEATING_FLAG: str - PHOTO_SUPPLY: float - TRANSACTION_TYPE: str - ENERGY_TARIFF: str - EXTENSION_COUNT: float - TOTAL_FLOOR_AREA: float - FLOOR_HEIGHT: float - HOT_WATER_ENERGY_EFF: str - FLOOR_ENERGY_EFF: str - WINDOWS_ENERGY_EFF: str - WALLS_ENERGY_EFF: str - SHEATING_ENERGY_EFF: str - ROOF_ENERGY_EFF: str - MAINHEAT_ENERGY_EFF: str - MAINHEATC_ENERGY_EFF: str - LIGHTING_ENERGY_EFF: str - POTENTIAL_ENERGY_EFFICIENCY: float - ENVIRONMENT_IMPACT_POTENTIAL: float - ENERGY_CONSUMPTION_POTENTIAL: float - CO2_EMISSIONS_POTENTIAL: float - LODGEMENT_DATE: str - CURRENT_ENERGY_EFFICIENCY: int - ENERGY_CONSUMPTION_CURRENT: int - CO2_EMISSIONS_CURRENT: float + uprn: str + walls_description: str + floor_description : str + lighting_description : str + roof_description : str + mainheat_description : str + hotwater_description : str + main_fuel : str + mechanical_ventilation : str + secondheat_description : str + windows_description : str + glazed_type : str + multi_glaze_proportion : float + low_energy_lighting : float + number_open_fireplaces : float + mainheatcont_description : str + solar_water_heating_flag : str + photo_supply : float + transaction_type : str + energy_tariff : str + extension_count : float + total_floor_area : float + floor_height : float + hot_water_energy_eff : str + floor_energy_eff : str + windows_energy_eff : str + walls_energy_eff : str + sheating_energy_eff : str + roof_energy_eff : str + mainheat_energy_eff : str + mainheatc_energy_eff : str + lighting_energy_eff : str + potential_energy_efficiency : float + environment_impact_potential : float + energy_consumption_potential : float + co2_emissions_potential : float + lodgement_date : str + current_energy_efficiency : int + energy_consumption_current : int + co2_emissions_current : float u_values_walls = None u_values_roof = None @@ -79,9 +85,9 @@ class EPCRecord: # self._field_validation() self._clean_records() self._expand_description() - self._generate_uvalues() - self._validate_expanded_description() - self._validate_u_values() + # self._generate_uvalues() + # self._validate_expanded_description() + # self._validate_u_values() # etc pass @@ -227,6 +233,7 @@ class EPCDifferenceRecord: """ self.record1 = record1 self.record2 = record2 + self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2 self.flag_fabric_consistency = False self.difference_record = {} @@ -239,7 +246,7 @@ class EPCDifferenceRecord: self._construct_difference_record() self._validate_difference_record() - self._detect_fabric_consistency() + # self._detect_fabric_consistency() def _construct_difference_record(self): @@ -257,20 +264,20 @@ class EPCDifferenceRecord: # TODO: Take the earliest potentials self.difference_record = { - "UPRN": self.record1.get("UPRN"), - "RDSAP_CHANGE": rdsap_change, - "HEAT_DEMAND_CHANGE": heat_demand_change, - "CARBON_CHANGE": carbon_change, - "SAP_STARTING": self.record1.get(RDSAP_RESPONSE), - "SAP_ENDING": self.record2.get(RDSAP_RESPONSE), - "HEAT_DEMAND_STARTING": self.record1.get(HEAT_DEMAND_RESPONSE), - "HEAT_DEMAND_ENDING": self.record2.get(HEAT_DEMAND_RESPONSE), - "CARBON_STARTING": self.record1.get(CARBON_RESPONSE), - "CARBON_ENDING": self.record2.get(CARBON_RESPONSE), - "POTENTIAL_ENERGY_EFFICIENCY": max(self.record1.get("POTENTIAL_ENERGY_EFFICIENCY"), self.record2.get("POTENTIAL_ENERGY_EFFICIENCY")), - "ENVIRONMENT_IMPACT_POTENTIAL": max(self.record1.get("ENVIRONMENT_IMPACT_POTENTIAL"), self.record2.get("ENVIRONMENT_IMPACT_POTENTIAL")), - "ENERGY_CONSUMPTION_POTENTIAL": max(self.record1.get("ENERGY_CONSUMPTION_POTENTIAL"), self.record2.get("ENERGY_CONSUMPTION_POTENTIAL")), - "CO2_EMISSIONS_POTENTIAL": max(self.record1.get("CO2_EMISSIONS_POTENTIAL"), self.record2.get("CO2_EMISSIONS_POTENTIAL")), + "uprn": self.record1.get("UPRN"), + "rdsap_change": rdsap_change, + "heat_demand_change": heat_demand_change, + "carbon_change": carbon_change, + "sap_starting": self.record1.get(RDSAP_RESPONSE), + "sap_ending": self.record2.get(RDSAP_RESPONSE), + "heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE), + "heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE), + "carbon_starting": self.record1.get(CARBON_RESPONSE), + "carbon_ending": self.record2.get(CARBON_RESPONSE), + "potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"), + "environment_impact_potential": self.earliest_record.get("environment_impact_potential"), + "energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"), + "co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"), **ending_record, **starting_record } diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 1aec2b55..ee60ecb3 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -3,6 +3,8 @@ import numpy as np from tqdm import tqdm import msgpack +from typing import List + from pathlib import Path from etl.epc.settings import ( MANDATORY_FIXED_FEATURES, @@ -10,15 +12,14 @@ from etl.epc.settings import ( COMPONENT_FEATURES, RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, - COLUMNS_TO_MERGE_ON, CARBON_RESPONSE, CORE_COMPONENT_FEATURES, EFFICIENCY_FEATURES, POTENTIAL_COLUMNS, MINIMUM_FLOOR_HEIGHT ) -from etl.epc.DataProcessor import DataProcessor -from etl.epc.EPCRecord import EPCRecord, EPCDifferenceRecord +from etl.epc.DataProcessor import EPCDataProcessor +from etl.epc.Record import EPCRecord, EPCDifferenceRecord from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( @@ -28,6 +29,16 @@ from recommendations.recommendation_utils import ( DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" +# TODO: change in setting file +MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES] +LATEST_FIELD = [x.lower() for x in LATEST_FIELD] +COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] +RDSAP_RESPONSE = RDSAP_RESPONSE.lower() +HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower() +CARBON_RESPONSE = CARBON_RESPONSE.lower() +CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES] +EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] +POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] def get_cleaned_description_mapping(): """ @@ -399,7 +410,6 @@ def make_uvalues(df): # if all_equal: # return True -from typing import List class EPCPipeline: """ This class will take a list of directories and process them to create a dataset: @@ -451,6 +461,43 @@ class EPCPipeline: """ pass + +def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_model_data: list, all_equal_rows: list): + """ + Compare consecutive EPCs for a given UPRN + :param epc_records: + :return: + """ + for idx in range(0, len(epc_records) - 1): + + if idx >= len(epc_records) - 1: + break + + earliest_record: EPCRecord = epc_records[idx] + latest_record: EPCRecord = epc_records[idx + 1] + + # Auto sort the records so that the record with highest RDSAP score is always record1 + difference_record: EPCDifferenceRecord = latest_record - earliest_record + + # TODO: Pull out RDSAP_CHANGE to a variable + if difference_record.get("rdsap_change") == 0: + continue + + all_equal = difference_record.compare_fields_in_records( + fields=[x.lower() for x in CORE_COMPONENT_FEATURES] + ) + + if all_equal: + # Keep track of this for the moment so we can analyse + all_equal_rows.append({"uprn": uprn, "directory_name": directory.name}) + continue + + difference_record.append_fixed_data(fixed_data) + + property_model_data.append(difference_record) + + return property_model_data, all_equal_rows + def app(): # Get all the files in the directory @@ -462,8 +509,6 @@ def app(): # List all subdirectories directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - - dataset = [] cleaning_dataset = [] # Keep track of the all equals @@ -472,41 +517,32 @@ def app(): for directory in tqdm(directories): filepath = directory / "certificates.csv" - data_processor = DataProcessor(filepath=filepath) + epc_data_processor = EPCDataProcessor(filepath=filepath) - data_processor.pre_process() + epc_data_processor.pre_process() - df = data_processor.data + df = epc_data_processor.data - cleaning_dataset.append(data_processor.cleaning_averages) + cleaning_dataset.append(epc_data_processor.cleaning_averages) data_by_uprn = [] - for uprn, property_data in df.groupby("UPRN", observed=True): + for uprn, property_data in df.groupby("uprn", observed=True): - # Fixed features - these are property attributes that shouldn't change over time - fixed_data = {} - # If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or ( pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0 ): continue - # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS - latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict() - mandatory_field_data = ( - property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict() - ) - - # Combine all fields together - fixed_data.update(mandatory_field_data) - fixed_data.update(latest_field_data) + # Fixed features - these are property attributes that shouldn't change over time + # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together + fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict() # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = property_data[ COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [ - "LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE + "lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE ] ] @@ -516,6 +552,10 @@ def app(): epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')] + property_model_data, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_model_data, all_equal_rows) + + + for idx in range(0, len(epc_records) - 1): if idx >= len(epc_records) - 1: