Merge branch 'etl-michael' of github.com:Hestia-Homes/Model into etl-michael

This commit is contained in:
Michael Duong 2023-12-05 11:33:42 +00:00
commit 2845badbc0
2 changed files with 89 additions and 27 deletions

View file

@ -67,16 +67,17 @@ class DataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path | None, newdata: bool = False) -> None:
def __init__(self, filepath: Path | None, is_newdata: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param newdata: Indicates if we are processing new, testing data.
:param is_newdata: Indicates if we are processing new, testing data.
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.filepath = filepath
self.data = None
self.newdata = newdata
self.cleaning_averages = None
self.is_newdata = is_newdata
def load_data(self, low_memory=False) -> None:
if not self.filepath:
@ -130,6 +131,7 @@ class DataProcessor:
TODO: We could use a model based impution approach for possibly more accurate cleaning
"""
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
def apply_clean(data, matching_columns):
@ -174,13 +176,13 @@ class DataProcessor:
if self.data is None:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if not self.newdata:
if not self.is_newdata:
self.confine_data()
self.remap_columns()
# We have some non-standard construction age bands which we'll clean for matching
if not self.newdata:
if not self.is_newdata:
self.standardise_construction_age_band()
self.clean_missing_rooms()
@ -188,12 +190,12 @@ class DataProcessor:
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
if not self.newdata:
if not self.is_newdata:
self.clean_multi_glaze_proportion()
self.clean_photo_supply()
if not self.newdata:
if not self.is_newdata:
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
@ -202,24 +204,37 @@ class DataProcessor:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
if not self.newdata:
if not self.is_newdata:
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Final re-casting after data transformed and prepared
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.newdata else COLUMNTYPES
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
for k, v in coltypes.items():
self.data[k] = self.data[k].astype(v)
self.data = self.data.astype(coltypes)
self.na_remapping()
return self.data
if not self.is_newdata:
# We have some odd cases with missing constituency so we fill
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
self.cleaning_averages = self.make_cleaning_averages()
# We apply averages cleaning to the data
self.data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
def na_remapping(self):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if self.newdata else fill_na_map
} if self.is_newdata else fill_na_map
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -264,7 +279,7 @@ class DataProcessor:
data = data.replace(np.NAN, None)
# Remap certain columns
if not self.newdata:
if not self.is_newdata:
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)

View file

@ -397,6 +397,63 @@ def compare_records(earliest_record: pd.Series, latest_record: pd.Series, column
if all_equal:
return True
from dataclasses import dataclass
@dataclass
class EPCRecord:
"""
Base class for a EPC record
"""
WALLS_DESCRIPTION: str
FLOOR_DESCRIPTION: str
LIGHTING_DESCRIPTION: str
ROOF_DESCRIPTION: str
MAINHEAT_DESCRIPTION: str
HOTWATER_DESCRIPTION: str
MAIN_FUEL: str
MECHANICAL_VENTILATION: str
SECONDHEAT_DESCRIPTION: str
WINDOWS_DESCRIPTION: str
GLAZED_TYPE: str
MULTI_GLAZE_PROPORTION: float
LOW_ENERGY_LIGHTING: float
NUMBER_OPEN_FIREPLACES: float
MAINHEATCONT_DESCRIPTION: str
SOLAR_WATER_HEATING_FLAG: str
PHOTO_SUPPLY: float
TRANSACTION_TYPE: str
ENERGY_TARIFF: str
EXTENSION_COUNT: float
TOTAL_FLOOR_AREA: float
FLOOR_HEIGHT: float
HOT_WATER_ENERGY_EFF: str
FLOOR_ENERGY_EFF: str
WINDOWS_ENERGY_EFF: str
WALLS_ENERGY_EFF: str
SHEATING_ENERGY_EFF: str
ROOF_ENERGY_EFF: str
MAINHEAT_ENERGY_EFF: str
MAINHEATC_ENERGY_EFF: str
LIGHTING_ENERGY_EFF: str
POTENTIAL_ENERGY_EFFICIENCY: float
ENVIRONMENT_IMPACT_POTENTIAL: float
ENERGY_CONSUMPTION_POTENTIAL: float
CO2_EMISSIONS_POTENTIAL: float
LODGEMENT_DATE: str
CURRENT_ENERGY_EFFICIENCY: int
ENERGY_CONSUMPTION_CURRENT: int
CO2_EMISSIONS_CURRENT: float
# def __init__(self, num) -> None:
# self.num = num
# def __sub__(self, other):
# return self.num - other.num
test = EPCRecord(10)
test2 = EPCRecord(20)
test - test2
def app():
# Get all the files in the directory
@ -419,18 +476,11 @@ def app():
data_processor = DataProcessor(filepath=filepath)
df = data_processor.pre_process()
data_processor.pre_process()
df = data_processor.data
cleaning_averages = data_processor.make_cleaning_averages()
# We have some odd cases with missing constituency so we fill
df = df.fillna({"CONSTITUENCY": df["CONSTITUENCY"].mode().values[0]})
df = DataProcessor.apply_averages_cleaning(
data_to_clean=df,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
cleaning_dataset.append(data_processor.cleaning_averages)
data_by_urpn = []
for uprn, property_data in df.groupby("UPRN", observed=True):
@ -466,7 +516,7 @@ def app():
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
for idx in range(0, property_data.shape[0] - 1):
if idx >= property_data.shape[0] - 1:
break
@ -592,9 +642,6 @@ def app():
dataset.append(data_by_urpn_df)
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
print("Final all equal count: %s" % str(len(all_equal_rows)))
# Store cleaning dataset in s3 as a parquet file