start refactor

This commit is contained in:
Michael Duong 2023-12-14 20:31:58 +00:00
parent c769c07843
commit 1ce9a4521f
4 changed files with 168 additions and 123 deletions

View file

@ -62,17 +62,12 @@ def is_int(x):
return False
class DataProcessor:
class EPCDataProcessor:
"""
Handle data loading and data preprocessing
"""
training_pipeline = {
"load_data": {
"function": "load_data",
"args": [],
"kwargs": {"low_memory": DATA_PROCESSOR_SETTINGS["low_memory"]},
},
"confine_data": {
"function": "confine_data",
"args": [],
@ -174,8 +169,8 @@ class DataProcessor:
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.data : pd.DataFrame = None
self.cleaning_averages : pd.DataFrame = None
self.data : pd.DataFrame
self.cleaning_averages : pd.DataFrame
self.filepath = filepath
self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training")
@ -183,22 +178,6 @@ class DataProcessor:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
def pre_process_pipeline(self) -> None:
"""
For the pipeline_steps, we apply each function in turn
"""
for step in self.pipeline_steps:
step_function = getattr(self, self.pipeline_steps[step]["function"])
step_args = self.pipeline_steps[step]["args"]
step_kwargs = self.pipeline_steps[step]["kwargs"]
if step_args:
step_function(*step_args, **step_kwargs)
else:
step_function(**step_kwargs)
def pipeline_factory(self, pipeline_type: str) -> dict:
"""
Determine which dataclient to use
@ -213,6 +192,23 @@ class DataProcessor:
raise ValueError("Pipeline type specified is not in factory")
return pipelines[pipeline_type]
def pre_process_pipeline(self) -> None:
"""
For the pipeline_steps, we apply each function in turn
This will alter self.data inplace
"""
for step in self.pipeline_steps:
step_function = getattr(self, self.pipeline_steps[step]["function"])
step_args = self.pipeline_steps[step]["args"]
step_kwargs = self.pipeline_steps[step]["kwargs"]
if step_args:
step_function(*step_args, **step_kwargs)
else:
step_function(**step_kwargs)
def load_data(self, low_memory=False) -> None:
@ -305,7 +301,7 @@ class DataProcessor:
break
to_index -= 1
def pre_process(self) -> pd.DataFrame:
def pre_process(self):
"""
Load data and begin initial cleaning
"""
@ -346,7 +342,7 @@ class DataProcessor:
# Final re-casting after data transformed and prepared
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
for k, v in coltypes.items():
self.data[k] = self.data[k].astype(v)
self.data[k] = self.data[k].astype(v)
self.data = self.data.astype(coltypes)
self.na_remapping()
@ -365,6 +361,8 @@ class DataProcessor:
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.data.columns = self.data.columns.str.lower()
def na_remapping(self):

View file

@ -1,6 +1,6 @@
import pandas as pd
from typing import List
from etl.epc.EPCRecord import EPCDifferenceRecord
from etl.epc.Record import EPCDifferenceRecord
from ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.settings import EARLIEST_EPC_DATE
@ -19,14 +19,14 @@ class BaseDataset:
"""
self.dataset_validation: dict = DatasetValidationConfiguration
def pipeline_factory(self, pipeline_type: str) -> dict:
"""
Factory method for creating a pipeline
"""
if pipeline_type not in self.pipeline_steps:
raise ValueError(f"Pipeline type {pipeline_type} not found")
# def pipeline_factory(self, pipeline_type: str) -> dict:
# """
# Factory method for creating a pipeline
# """
# if pipeline_type not in self.pipeline_steps:
# raise ValueError(f"Pipeline type {pipeline_type} not found")
return self.pipeline_steps[pipeline_type]
# return self.pipeline_steps[pipeline_type]
class TrainingDataset(BaseDataset):
"""
@ -34,7 +34,7 @@ class TrainingDataset(BaseDataset):
"""
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
self.pipeline_steps = self.pipeline_factory("training")
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
@ -42,11 +42,11 @@ class TrainingDataset(BaseDataset):
self._feature_generation()
self._drop_features()
# self._clean_dataframe()
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
# self._process_and_prune()
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
# self._clean_efficiency_variables()
# self._null_validation(information="Clean Efficiency Variables")
# # self._process_and_prune()
# self._clean_missing_values()
# self._null_validation(information="Clean Missing Values")
def _clean_missing_values(self, ignore_cols=None):

View file

@ -14,6 +14,13 @@ from etl.epc.settings import (
EFFICIENCY_FEATURES
)
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
@dataclass
class EPCRecord:
"""
@ -24,47 +31,46 @@ class EPCRecord:
floor = None
roof = None
UPRN: str
WALLS_DESCRIPTION: str
FLOOR_DESCRIPTION: str
LIGHTING_DESCRIPTION: str
ROOF_DESCRIPTION: str
MAINHEAT_DESCRIPTION: str
HOTWATER_DESCRIPTION: str
MAIN_FUEL: str
MECHANICAL_VENTILATION: str
SECONDHEAT_DESCRIPTION: str
WINDOWS_DESCRIPTION: str
GLAZED_TYPE: str
MULTI_GLAZE_PROPORTION: float
LOW_ENERGY_LIGHTING: float
NUMBER_OPEN_FIREPLACES: float
MAINHEATCONT_DESCRIPTION: str
SOLAR_WATER_HEATING_FLAG: str
PHOTO_SUPPLY: float
TRANSACTION_TYPE: str
ENERGY_TARIFF: str
EXTENSION_COUNT: float
TOTAL_FLOOR_AREA: float
FLOOR_HEIGHT: float
HOT_WATER_ENERGY_EFF: str
FLOOR_ENERGY_EFF: str
WINDOWS_ENERGY_EFF: str
WALLS_ENERGY_EFF: str
SHEATING_ENERGY_EFF: str
ROOF_ENERGY_EFF: str
MAINHEAT_ENERGY_EFF: str
MAINHEATC_ENERGY_EFF: str
LIGHTING_ENERGY_EFF: str
POTENTIAL_ENERGY_EFFICIENCY: float
ENVIRONMENT_IMPACT_POTENTIAL: float
ENERGY_CONSUMPTION_POTENTIAL: float
CO2_EMISSIONS_POTENTIAL: float
LODGEMENT_DATE: str
CURRENT_ENERGY_EFFICIENCY: int
ENERGY_CONSUMPTION_CURRENT: int
CO2_EMISSIONS_CURRENT: float
uprn: str
walls_description: str
floor_description : str
lighting_description : str
roof_description : str
mainheat_description : str
hotwater_description : str
main_fuel : str
mechanical_ventilation : str
secondheat_description : str
windows_description : str
glazed_type : str
multi_glaze_proportion : float
low_energy_lighting : float
number_open_fireplaces : float
mainheatcont_description : str
solar_water_heating_flag : str
photo_supply : float
transaction_type : str
energy_tariff : str
extension_count : float
total_floor_area : float
floor_height : float
hot_water_energy_eff : str
floor_energy_eff : str
windows_energy_eff : str
walls_energy_eff : str
sheating_energy_eff : str
roof_energy_eff : str
mainheat_energy_eff : str
mainheatc_energy_eff : str
lighting_energy_eff : str
potential_energy_efficiency : float
environment_impact_potential : float
energy_consumption_potential : float
co2_emissions_potential : float
lodgement_date : str
current_energy_efficiency : int
energy_consumption_current : int
co2_emissions_current : float
u_values_walls = None
u_values_roof = None
@ -79,9 +85,9 @@ class EPCRecord:
# self._field_validation()
self._clean_records()
self._expand_description()
self._generate_uvalues()
self._validate_expanded_description()
self._validate_u_values()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
@ -227,6 +233,7 @@ class EPCDifferenceRecord:
"""
self.record1 = record1
self.record2 = record2
self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2
self.flag_fabric_consistency = False
self.difference_record = {}
@ -239,7 +246,7 @@ class EPCDifferenceRecord:
self._construct_difference_record()
self._validate_difference_record()
self._detect_fabric_consistency()
# self._detect_fabric_consistency()
def _construct_difference_record(self):
@ -257,20 +264,20 @@ class EPCDifferenceRecord:
# TODO: Take the earliest potentials
self.difference_record = {
"UPRN": self.record1.get("UPRN"),
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
"CARBON_CHANGE": carbon_change,
"SAP_STARTING": self.record1.get(RDSAP_RESPONSE),
"SAP_ENDING": self.record2.get(RDSAP_RESPONSE),
"HEAT_DEMAND_STARTING": self.record1.get(HEAT_DEMAND_RESPONSE),
"HEAT_DEMAND_ENDING": self.record2.get(HEAT_DEMAND_RESPONSE),
"CARBON_STARTING": self.record1.get(CARBON_RESPONSE),
"CARBON_ENDING": self.record2.get(CARBON_RESPONSE),
"POTENTIAL_ENERGY_EFFICIENCY": max(self.record1.get("POTENTIAL_ENERGY_EFFICIENCY"), self.record2.get("POTENTIAL_ENERGY_EFFICIENCY")),
"ENVIRONMENT_IMPACT_POTENTIAL": max(self.record1.get("ENVIRONMENT_IMPACT_POTENTIAL"), self.record2.get("ENVIRONMENT_IMPACT_POTENTIAL")),
"ENERGY_CONSUMPTION_POTENTIAL": max(self.record1.get("ENERGY_CONSUMPTION_POTENTIAL"), self.record2.get("ENERGY_CONSUMPTION_POTENTIAL")),
"CO2_EMISSIONS_POTENTIAL": max(self.record1.get("CO2_EMISSIONS_POTENTIAL"), self.record2.get("CO2_EMISSIONS_POTENTIAL")),
"uprn": self.record1.get("UPRN"),
"rdsap_change": rdsap_change,
"heat_demand_change": heat_demand_change,
"carbon_change": carbon_change,
"sap_starting": self.record1.get(RDSAP_RESPONSE),
"sap_ending": self.record2.get(RDSAP_RESPONSE),
"heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE),
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
"carbon_starting": self.record1.get(CARBON_RESPONSE),
"carbon_ending": self.record2.get(CARBON_RESPONSE),
"potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"),
"environment_impact_potential": self.earliest_record.get("environment_impact_potential"),
"energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"),
"co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"),
**ending_record,
**starting_record
}

View file

@ -3,6 +3,8 @@ import numpy as np
from tqdm import tqdm
import msgpack
from typing import List
from pathlib import Path
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
@ -10,15 +12,14 @@ from etl.epc.settings import (
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
MINIMUM_FLOOR_HEIGHT
)
from etl.epc.DataProcessor import DataProcessor
from etl.epc.EPCRecord import EPCRecord, EPCDifferenceRecord
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
@ -28,6 +29,16 @@ from recommendations.recommendation_utils import (
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
def get_cleaned_description_mapping():
"""
@ -399,7 +410,6 @@ def make_uvalues(df):
# if all_equal:
# return True
from typing import List
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
@ -451,6 +461,43 @@ class EPCPipeline:
"""
pass
def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_model_data: list, all_equal_rows: list):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx + 1]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_model_data.append(difference_record)
return property_model_data, all_equal_rows
def app():
# Get all the files in the directory
@ -462,8 +509,6 @@ def app():
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
# Keep track of the all equals
@ -472,41 +517,32 @@ def app():
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
data_processor = DataProcessor(filepath=filepath)
epc_data_processor = EPCDataProcessor(filepath=filepath)
data_processor.pre_process()
epc_data_processor.pre_process()
df = data_processor.data
df = epc_data_processor.data
cleaning_dataset.append(data_processor.cleaning_averages)
cleaning_dataset.append(epc_data_processor.cleaning_averages)
data_by_uprn = []
for uprn, property_data in df.groupby("UPRN", observed=True):
for uprn, property_data in df.groupby("uprn", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[
COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
]
@ -516,6 +552,10 @@ def app():
epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')]
property_model_data, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_model_data, all_equal_rows)
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1: