preparing filtered columns

This commit is contained in:
Khalim Conn-Kowlessar 2026-02-05 13:11:53 +00:00
parent 5fa6289b44
commit a10a3bb1aa
7 changed files with 100 additions and 34 deletions

View file

@ -1,5 +1,5 @@
import pandas as pd
from utils.s3 import read_from_s3, read_excel_from_s3
from utils.s3 import read_from_s3, read_excel_from_s3, save_csv_to_s3
class OnboarderBase:
@ -48,8 +48,11 @@ class OnboarderBase:
else:
self.data = read_from_s3(bucket_name=bucket_name, s3_file_name=file_name)
def write(self):
pass
def write(self, bucket_name: str, file_name: str):
if self.data is None:
raise ValueError("No data to write. Please run transform() before writing.")
# Store file as csv - will store in the same route location as the input file
save_csv_to_s3(dataframe=self.data, bucket_name=bucket_name, file_name=file_name)
@staticmethod
def assert_nulls_only_from_source_nulls(data: pd.DataFrame, original_column: str, mapped_column: str) -> bool:

View file

@ -1,10 +1,19 @@
import json
from pydantic import BaseModel, Field
from typing import Optional, Literal
from onboarders.factory import OnboarderFactory
from utils.logger import setup_logger
logger = setup_logger()
class OnboardingEvent(BaseModel):
s3_uri: str = Field(..., description="S3 URI of the raw ARA input file")
system: Literal["parity", "generic"] = Field(..., description="Onboarding system identifier")
format: Literal["csv", "xlsx"]
sheet_name: Optional[str] = None
def handler(event, context):
"""
Lambda handler that triggers the model engine for each SQS message.
@ -12,22 +21,27 @@ def handler(event, context):
for record in event.get("Records", []):
try:
event_body = json.loads(record["body"])
# TODO: Implement logic to check which file type we have
# Sample input data
event_body = {
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for "
"Domna.xlsx",
"system": "parity",
"format": "xlsx",
"sheet_name": "Sustainability"
}
# event_body = {
# "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for "
# "Domna.xlsx",
# "system": "parity",
# "format": "xlsx",
# "sheet_name": "Sustainability"
# }
logger.info("Processing record with body: %s", event_body)
Onboarder = OnboarderFactory.create_onboarder(event_body["system"])
onboarder = Onboarder(fileuri=event_body["s3_uri"])
logger.info("Transforming data for record with body: %s", event_body)
validated_event = OnboardingEvent(**event_body)
Onboarder = OnboarderFactory.create_onboarder(validated_event.system)
onboarder = Onboarder(
fileuri=validated_event.s3_uri,
format=validated_event.format,
sheet_name=validated_event.sheet_name
)
logger.info("Transforming data")
onboarder.transform()
logger.info("Writing data for record with body: %s", event_body)
logger.info("Writing data")
onboarder.write()
except Exception as e:
logger.error(f"Failed to process record: {e}")

View file

@ -56,4 +56,5 @@ as_built_floor_classifiers = {
unknown_as_built_floor_classifiers = {
"RetroFitted": unknown_floor_retrofitted,
"AsBuilt": unknown_floor_as_built,
"Unknown": unknown_floor_as_built,
}

View file

@ -1,4 +1,5 @@
from backend.onboarders.epc_descriptions import EpcConstructionAgeBand, EpcRoofDescriptions
from datatypes.epc.roof import EpcRoofDescriptions
from datatypes.epc.construction_age_band import EpcConstructionAgeBand
def map_flat_roof(age_band: EpcConstructionAgeBand) -> EpcRoofDescriptions:

View file

@ -1,4 +1,5 @@
from backend.onboarders.epc_descriptions import EpcConstructionAgeBand, EpcWallDescriptions
from datatypes.epc.construction_age_band import EpcConstructionAgeBand
from datatypes.epc.walls import EpcWallDescriptions
def map_cavity_wall_insulation(age_band: EpcConstructionAgeBand):

View file

@ -30,12 +30,13 @@ class ParityOnboarder(OnboarderBase):
def __init__(
self,
fileuri: str,
**kwargs
):
# Extract bucket, and filekey; Will be in the format s3://bucket/key
bucket_name = fileuri.split("/")[2]
file_name = "/".join(fileuri.split("/")[3:])
self.read_s3(bucket_name=bucket_name, file_name=file_name)
self.read_s3(bucket_name=bucket_name, file_name=file_name, **kwargs)
pass
def map_construction_age_band(self):
@ -61,20 +62,20 @@ class ParityOnboarder(OnboarderBase):
type and age band
"""
# Already resolved via direct mapping
if row.landlord_wall_description is not None:
return row.landlord_wall_description
if row.landlord_wall_construction is not None:
return row.landlord_wall_construction
wall_type = row["Wall Construction"]
# Missing construction age → conservative fallback
if pd.isnull(row.construction_age_band):
if pd.isnull(row.landlord_construction_age_band):
return wall_unknown_age_fallback.get(wall_type)
classifier = as_built_wall_classifiers.get(wall_type)
if classifier is None:
return None
return classifier(row.construction_age_band)
return classifier(row.landlord_construction_age_band)
@staticmethod
def _resolve_wall_efficiency(
@ -113,8 +114,8 @@ class ParityOnboarder(OnboarderBase):
self.data[self.landlord_wall_efficiency] = self.data.progress_apply(
lambda row: self._resolve_wall_efficiency(
row.landlord_wall_description,
row.construction_age_band,
row.landlord_wall_construction,
row.landlord_construction_age_band,
),
axis=1,
)
@ -124,8 +125,8 @@ class ParityOnboarder(OnboarderBase):
@staticmethod
def _fill_roof_as_built(row: pd.Series) -> EpcRoofDescriptions | None:
# Already resolved
if not pd.isnull(row.landlord_roof_description):
return row.landlord_roof_description
if not pd.isnull(row.landlord_roof_construction):
return row.landlord_roof_construction
roof_type = row["Roof Construction"]
@ -133,10 +134,10 @@ class ParityOnboarder(OnboarderBase):
if classifier is None:
raise NotImplementedError(f"No roof classifier for roof type '{roof_type}'")
if pd.isnull(row.construction_age_band):
if pd.isnull(row.landlord_construction_age_band):
return roof_unknown_age_fallback.get(roof_type)
output = classifier(row.construction_age_band)
output = classifier(row.landlord_construction_age_band)
if output is None:
raise NotImplementedError(
f"Roof classification returned None for roof type '{roof_type}'"
@ -180,8 +181,8 @@ class ParityOnboarder(OnboarderBase):
self.data[self.landlord_roof_efficiency] = self.data.progress_apply(
lambda row: resolve_roof_efficiency(
description=row.landlord_roof_description,
age_band=row.construction_age_band,
description=row.landlord_roof_construction,
age_band=row.landlord_construction_age_band,
insulation_thickness=row.roof_insulation_thickness_mm,
),
axis=1,
@ -190,17 +191,17 @@ class ParityOnboarder(OnboarderBase):
self.assert_no_nulls(self.data, self.landlord_roof_efficiency)
# Flag sloping ceiling
data[self.landlord_has_sloping_ceiling] = data["Roof Construction"].apply(
self.data[self.landlord_has_sloping_ceiling] = self.data["Roof Construction"].apply(
lambda x: x == "PitchedWithSlopingCeiling"
)
@staticmethod
def _fill_floor_as_built(row: pd.Series):
# 1. Already resolved
if row.landlord_floor_description is not None:
return row.landlord_floor_description
if row.landlord_floor_construction is not None:
return row.landlord_floor_construction
age_band = row.construction_age_band
age_band = row.landlord_construction_age_band
floor_type = row["Floor Construction"]
insulation = row["Floor Insulation"]
@ -281,6 +282,48 @@ class ParityOnboarder(OnboarderBase):
columns={"Total Floor Area (m2)": self.landlord_total_floor_area_m2}
)
def select_columns(self):
self.data = self.data[
[
"Org Ref",
"UPRN",
"Address 1",
"Address 2",
"Address 3",
"Postcode",
self.landlord_total_floor_area_m2,
self.landlord_construction_age_band,
self.landlord_property_type,
self.landlord_built_form,
self.landlord_wall_construction,
self.landlord_wall_efficiency,
self.landlord_roof_construction,
self.landlord_roof_efficiency,
self.landlord_has_sloping_ceiling,
self.landlord_floor_construction,
self.landlord_windows_construction,
self.landlord_windows_efficiency,
self.landlord_multi_glaze_proportion,
self.landlord_glazed_type,
self.landlord_glazed_area,
self.landlord_heating_construction,
self.landlord_heating_efficiency,
self.landlord_fuel_construction,
self.landlord_heating_controls_construction,
self.landlord_heating_controls_efficiency,
self.landlord_hot_water_system_construction,
self.landlord_hot_water_efficiency
]
].rename(
columns={
"Org Ref": "landlord_property_id",
"Address1": "address1",
"Address2": "address2",
"Address3": "address3",
"Postcode": "postcode",
}
)
def transform(self):
# ------------ construction_age_band ------------
self.map_construction_age_band()
@ -308,3 +351,6 @@ class ParityOnboarder(OnboarderBase):
# ------------ Floor Area ------------
self.map_floor_area()
# ------------ Formating ------------
self.select_columns()

View file