add notes

This commit is contained in:
Michael Duong 2023-12-08 20:07:48 +00:00
parent 7a2c2fff15
commit aa998b3b71
6 changed files with 296 additions and 32 deletions

View file

@ -145,27 +145,31 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations
# TODO: p.get_model_data() -> EPCRecord
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
# TODO: Temp
if data_processor.data["UPRN"].values[0] == "":
data_processor.data["UPRN"] = 0
# data_processor = DataProcessor(None, newdata=True)
# data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
# # TODO: Temp
# if data_processor.data["UPRN"].values[0] == "":
# data_processor.data["UPRN"] = 0
data_processor.pre_process()
# data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
# ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
# fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
# # We update the ending record with the recommended updates and we set lodgement date to today
# ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
property_scoring_data[p.id] = {
"starting_epc_data": starting_epc_data,
"ending_epc_data": ending_epc_data,
"fixed_data": fixed_data
}
# TODO: EPCRecord - AdjustedEPCRecord
# property_scoring_data[p.id] = {
# "starting_epc_data": starting_epc_data,
# "ending_epc_data": ending_epc_data,
# "fixed_data": fixed_data
# }
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
@ -180,7 +184,7 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data.append(scoring_dict)
# cleanup
del data_processor
# del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)

View file

@ -67,6 +67,106 @@ class DataProcessor:
Handle data loading and data preprocessing
"""
training_pipeline = {
"load_data": {
"function": "load_data",
"args": [],
"kwargs": {"low_memory": DATA_PROCESSOR_SETTINGS["low_memory"]},
},
"confine_data": {
"function": "confine_data",
"args": [],
"kwargs": {},
},
"remap_columns": {
"function": "remap_columns",
"args": [],
"kwargs": {},
},
"standardise_construction_age_band": {
"function": "standardise_construction_age_band",
"args": [],
"kwargs": {},
},
"clean_missing_rooms": {
"function": "clean_missing_rooms",
"args": [],
"kwargs": {},
},
"recast_df_columns": {
"function": "recast_df_columns",
"args": [],
"kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]},
},
"clean_multi_glaze_proportion": {
"function": "clean_multi_glaze_proportion",
"args": [],
"kwargs": {},
},
"clean_photo_supply": {
"function": "clean_photo_supply",
"args": [],
"kwargs": {},
},
"retain_multiple_epc_properties": {
"function": "retain_multiple_epc_properties",
"args": [],
"kwargs": {"epc_minimum_count": DATA_PROCESSOR_SETTINGS["epc_minimum_count"]},
},
"fill_na_fields": {
"function": "fill_na_fields",
"args": [],
"kwargs": {"columns_to_fill": COLUMNS_TO_MERGE_ON},
},
"cleaning_averages": {
"function": "make_cleaning_averages",
"args": [],
"kwargs": {},
},
"apply_averages_cleaning": {
"function": "apply_averages_cleaning",
"args": [],
"kwargs": {
"data_to_clean": "data",
"cleaning_data": "cleaning_averages",
"cols_to_merge_on": COLUMNS_TO_MERGE_ON,
},
},
"na_remapping": {
"function": "na_remapping",
"args": [],
"kwargs": {},
},
}
newdata_pipeline = {
"remap_columns": {
"function": "remap_columns",
"args": [],
"kwargs": {},
},
"recast_df_columns": {
"function": "recast_df_columns",
"args": [],
"kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]},
},
"clean_multi_glaze_proportion": {
"function": "clean_multi_glaze_proportion",
"args": [],
"kwargs": {},
},
"clean_photo_supply": {
"function": "clean_photo_supply",
"args": [],
"kwargs": {},
},
"na_remapping": {
"function": "na_remapping",
"args": [],
"kwargs": {},
},
}
def __init__(self, filepath: Path | None, is_newdata: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
@ -74,11 +174,47 @@ class DataProcessor:
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.data : pd.DataFrame = None
self.cleaning_averages : pd.DataFrame = None
self.filepath = filepath
self.data = None
self.cleaning_averages = None
self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training")
self.is_newdata = is_newdata
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
def pre_process_pipeline(self) -> None:
"""
For the pipeline_steps, we apply each function in turn
"""
for step in self.pipeline_steps:
step_function = getattr(self, self.pipeline_steps[step]["function"])
step_args = self.pipeline_steps[step]["args"]
step_kwargs = self.pipeline_steps[step]["kwargs"]
if step_args:
step_function(*step_args, **step_kwargs)
else:
step_function(**step_kwargs)
def pipeline_factory(self, pipeline_type: str) -> dict:
"""
Determine which dataclient to use
"""
pipelines = {
"training": self.training_pipeline,
"newdata": self.newdata_pipeline,
}
if pipeline_type not in pipelines:
raise ValueError("Pipeline type specified is not in factory")
return pipelines[pipeline_type]
def load_data(self, low_memory=False) -> None:
if not self.filepath:
raise ValueError("No filepath specified")

View file

@ -1,22 +1,50 @@
import pandas as pd
from typing import List
from etl.epc.EPCRecord import EPCDifferenceRecord
from ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.settings import EARLIEST_EPC_DATE
class TrainingDataset:
class BaseDataset:
"""
Base class for all datasets
"""
def __init__(self) -> None:
self.pipeline_steps = {}
def validate_dataset(self):
"""
Validate the dataset against the validation configuration
"""
self.dataset_validation: dict = DatasetValidationConfiguration
def pipeline_factory(self, pipeline_type: str) -> dict:
"""
Factory method for creating a pipeline
"""
if pipeline_type not in self.pipeline_steps:
raise ValueError(f"Pipeline type {pipeline_type} not found")
return self.pipeline_steps[pipeline_type]
class TrainingDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
# TODO: replace these with the pipeline steps
self._feature_generation()
self._drop_features()
self._clean_dataframe()
# self._clean_dataframe()
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
self._process_and_prune()
# self._process_and_prune()
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
@ -56,7 +84,7 @@ class TrainingDataset:
self.df["DAYS_TO_STARTING"] = self._calculate_days_to(self.df["LODGEMENT_DATE_STARTING"])
self.df["DAYS_TO_ENDING"] = self._calculate_days_to(self.df["LODGEMENT_DATE_ENDING"])
def _clean_efficiency_variables(self, df):
def _clean_efficiency_variables(self):
"""
These is scope to clean this by the model per corresponding description.
@ -88,7 +116,7 @@ class TrainingDataset:
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).daye
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
@ -108,18 +136,19 @@ class TrainingDataset:
else:
return self.__add__(other)
class ScoringDataset:
class NewDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a ScoringDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
self.pipeline_steps = self.pipeline_factory("newdata")
self.datasets = datasets
def __add__(self, other) -> "ScoringDataset":
if not isinstance(other, ScoringDataset):
def __add__(self, other) -> "NewDataset":
if not isinstance(other, NewDataset):
raise TypeError("Addition can only be performed with another instance of ScoringDataset")
return ScoringDataset(self.datasets + other.datasets)
return NewDataset(self.datasets + other.datasets)
def __radd__(self, other):
"""

View file

@ -19,6 +19,12 @@ class EPCRecord:
"""
Base class for a EPC record
"""
# TODO: lower case and underscore
walls = None
floor = None
roof = None
UPRN: str
WALLS_DESCRIPTION: str
FLOOR_DESCRIPTION: str
@ -60,6 +66,10 @@ class EPCRecord:
ENERGY_CONSUMPTION_CURRENT: int
CO2_EMISSIONS_CURRENT: float
u_values_walls = None
u_values_roof = None
u_values_floor = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
@ -67,8 +77,31 @@ class EPCRecord:
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
self._clean_records()
self._expand_description()
self._generate_uvalues()
self._validate_expanded_description()
self._validate_u_values()
# etc
pass
def _expand_description(self):
# TODO: can be loop over all the descriptions, or done in one
pass
def _clean_records(self):
"""
This method will clean the records
"""
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
# self._clean_energy_consumption_potential()
# self._clean_co2_emissions_potential()
# self._clean_current_energy_efficiency()
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def _field_validation(self):
"""
This method will validate each of the fields in the EPC record
@ -194,8 +227,10 @@ class EPCDifferenceRecord:
"""
self.record1 = record1
self.record2 = record2
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration
self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration
@ -204,7 +239,7 @@ class EPCDifferenceRecord:
self._construct_difference_record()
self._validate_difference_record()
self._detect_fabric_consistency()
def _construct_difference_record(self):
@ -220,7 +255,7 @@ class EPCDifferenceRecord:
ending_record = self.record2.get(component_variables + ["LODGEMENT_DATE"], return_asdict=True, key_suffix="_ENDING")
starting_record = self.record1.get(component_variables + ["LODGEMENT_DATE"], return_asdict=True, key_suffix="_STARTING")
# TODO: DO we want to take the earliest potentials or max potentials?
# TODO: Take the earliest potentials
self.difference_record = {
"UPRN": self.record1.get("UPRN"),
"RDSAP_CHANGE": rdsap_change,

View file

@ -54,4 +54,8 @@ EPCDifferenceRecordFixedDataValidationConfiguration = {
"type": "string",
"acceptable_values": []
}
}
DatasetValidationConfiguration = {
}

View file

@ -399,6 +399,58 @@ def make_uvalues(df):
# if all_equal:
# return True
from typing import List
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
- Load the data
- Pre-process the data
- Create a dataset
- Clean the dataset
- Store the dataset
"""
def __init__(self, directories: List[Path]):
self.directories = directories
self.dataset = []
self.cleaning_dataset = []
self.all_equal_rows = []
def load_data(self):
"""
Load the data from the directories
:return:
"""
for directory in self.directories:
filepath = directory / "certificates.csv"
data_processor = DataProcessor(filepath=filepath)
data_processor.pre_process()
self.dataset.append(data_processor.data)
def create_dataset(self):
"""
Create a dataset from the data
:return:
"""
pass
def clean_dataset(self):
"""
Clean the dataset
:return:
"""
pass
def store_dataset(self):
"""
Store the dataset
:return:
"""
pass
def app():
# Get all the files in the directory
@ -410,6 +462,8 @@ def app():
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
# Keep track of the all equals
@ -606,8 +660,8 @@ def app():
data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup)
# Apply u-values
for col in ["walls_clean_description", "walls_clean_description_ENDING"]:
data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip()
# for col in ["walls_clean_description", "walls_clean_description_ENDING"]:
# data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip()
data_by_urpn_df = make_uvalues(data_by_urpn_df).drop(
columns=["walls_clean_description", "walls_clean_description_ENDING"]
@ -638,11 +692,13 @@ def app():
# TODO: move into difference record
# Remove any records that have huge swings in their floor area
# Move this into TrainingDataset as this won't be run in newdata
output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"])
output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"]
output = output[output["tfa_diff_prop"] < 0.5]
output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
# TODO: move into EPCRecord record
uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
output[uvalue_col] = pd.to_numeric(output[uvalue_col])