minor modifications to pre processor for it to work in the backend

This commit is contained in:
Khalim Conn-Kowlessar 2023-09-13 16:27:13 +01:00
parent 1fdbdbad07
commit d5bb04a091
2 changed files with 76 additions and 28 deletions

View file

@ -164,6 +164,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
Session = sessionmaker(bind=db_engine)
session = Session()
created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
try:
session.begin()
@ -321,15 +322,17 @@ async def trigger_plan(body: PlanTriggerRequest):
# TODO: We should use the cleaned data from get_components in the data rather than the raw
# values. We should create a method in Property which takes the EPC data and inserts the cleaned
# data
epc_data = p.data.copy()
epc_data = pd.DataFrame([epc_data])
epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns]
starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING")
ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING")
fixed_data = epc_data[FIXED_FEATURES]
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.data.copy()]))
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d")
ending_epc_data["LODGEMENT_DATE_ENDING"] = created_at
scoring_map = {
'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)',
@ -357,6 +360,9 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data.append(scoring_dict)
# cleanup
del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
@ -365,32 +371,21 @@ async def trigger_plan(body: PlanTriggerRequest):
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET,
file_key="sap_change_model/cleaning_dataset.parquet",
)
cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"})
# Merge the cleaning data onto recommendations_scoring_data
).rename(columns={"local-authority": "LOCAL_AUTHORITY"})
recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
# Merge the cleaning data onto recommendations_scoring_data
# Perform the same cleaning as in the model
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
)
recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"])
# Note: We might need to perform the full pre-processing here
data_processor = DataProcessor(filepath=None)
data_processor.insert_data(recommendations_scoring_data)
data_processor.remap_columns()
recommendations_scoring_data = data_processor.data
).drop(columns=["LOCAL_AUTHORITY"])
# Remap column types
recommendations_scoring_data = recommendations_scoring_data.astype(columntypes)
# Store parquet file in s3 for scoring
created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=body.portfolio_id,
timestamp=created_at

View file

@ -12,6 +12,8 @@ from model_data.simulation_system.core.Settings import (
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON,
COMPONENT_FEATURES,
FIXED_FEATURES
)
from typing import List
@ -21,9 +23,16 @@ class DataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path | None) -> None:
def __init__(self, filepath: Path | None, newdata: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param newdata: Indicates if we are processing new, testing data.
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.filepath = filepath
self.data = None
self.newdata = newdata
def load_data(self, low_memory=False) -> None:
if not self.filepath:
@ -140,14 +149,30 @@ class DataProcessor:
break
to_index -= 1
def reformat_columns(self):
"""
This function applies the re-formattng of columns from lower case to capitalised
When requesting the epc data from the api, the columns are lower case
and separated by a hyphen, whereas in the bulk download, the columns
are capitalised and separated by underscores. If rename_columns is True
we convert the columns from lower case to capitalised format
:return:
"""
self.data.columns = [col.upper().replace("-", "_") for col in self.data.columns]
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
if not self.data:
if self.data is None:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data()
if self.newdata:
self.reformat_columns()
if not self.newdata:
self.confine_data()
# We have some non-standard construction age bands which we'll clean for matching
self.standardise_construction_age_band()
@ -159,9 +184,10 @@ class DataProcessor:
self.clean_multi_glaze_proportion()
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
if not self.newdata:
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
self.remap_columns()
if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
@ -178,6 +204,7 @@ class DataProcessor:
"""
# Each uprn can fille backward from recent and forward fill from oldest
# The groupby changes the order and we use the index to make the original data
filled_data = (
self.data.groupby("UPRN", group_keys=True)[columns_to_fill]
.apply(lambda group: group.fillna(method="bfill").fillna(method="ffill"))
@ -188,6 +215,11 @@ class DataProcessor:
self.data[columns_to_fill] = filled_data[columns_to_fill]
# For floor area, we also replace "" values with None
self.data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = self.data[
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
def remap_columns(self):
"""
Remap all columns, for any non values
@ -430,3 +462,24 @@ class DataProcessor:
data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True)
return data_to_clean
def get_component_features(self, suffix: str) -> pd.DataFrame:
"""
This function will return the property components such as the walls, roof, heating etc
as well as lodgement date. These are features that we expect might change from one EPC to the
next
:param suffix: Should be one of "_STARTING" or "_ENDING"
:return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES
"""
if suffix not in ["_STARTING", "_ENDING"]:
raise Exception("Suffix should be one of _STARTING or _ENFING")
return self.data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix(suffix)
def get_fixed_features(self) -> pd.DataFrame:
"""
Returns the fixed features that we don't believe should vary from one EPC to the next
:return: Pandas dataframe containing the columns defined in FIXED_FEATURES
"""
return self.data[FIXED_FEATURES]