diff --git a/backend/onboarders/base.py b/backend/onboarders/base.py index 0e2351bd..4d09cfeb 100644 --- a/backend/onboarders/base.py +++ b/backend/onboarders/base.py @@ -1,5 +1,5 @@ import pandas as pd -from utils.s3 import read_from_s3, read_excel_from_s3 +from utils.s3 import read_from_s3, read_excel_from_s3, save_csv_to_s3 class OnboarderBase: @@ -48,8 +48,11 @@ class OnboarderBase: else: self.data = read_from_s3(bucket_name=bucket_name, s3_file_name=file_name) - def write(self): - pass + def write(self, bucket_name: str, file_name: str): + if self.data is None: + raise ValueError("No data to write. Please run transform() before writing.") + # Store file as csv - will store in the same route location as the input file + save_csv_to_s3(dataframe=self.data, bucket_name=bucket_name, file_name=file_name) @staticmethod def assert_nulls_only_from_source_nulls(data: pd.DataFrame, original_column: str, mapped_column: str) -> bool: diff --git a/backend/onboarders/handler.py b/backend/onboarders/handler.py index 0c38e4d9..dfff7788 100644 --- a/backend/onboarders/handler.py +++ b/backend/onboarders/handler.py @@ -1,10 +1,19 @@ import json +from pydantic import BaseModel, Field +from typing import Optional, Literal from onboarders.factory import OnboarderFactory from utils.logger import setup_logger logger = setup_logger() +class OnboardingEvent(BaseModel): + s3_uri: str = Field(..., description="S3 URI of the raw ARA input file") + system: Literal["parity", "generic"] = Field(..., description="Onboarding system identifier") + format: Literal["csv", "xlsx"] + sheet_name: Optional[str] = None + + def handler(event, context): """ Lambda handler that triggers the model engine for each SQS message. @@ -12,22 +21,27 @@ def handler(event, context): for record in event.get("Records", []): try: event_body = json.loads(record["body"]) - # TODO: Implement logic to check which file type we have # Sample input data - event_body = { - "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for " - "Domna.xlsx", - "system": "parity", - "format": "xlsx", - "sheet_name": "Sustainability" - } + # event_body = { + # "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for " + # "Domna.xlsx", + # "system": "parity", + # "format": "xlsx", + # "sheet_name": "Sustainability" + # } logger.info("Processing record with body: %s", event_body) - Onboarder = OnboarderFactory.create_onboarder(event_body["system"]) - onboarder = Onboarder(fileuri=event_body["s3_uri"]) - logger.info("Transforming data for record with body: %s", event_body) + validated_event = OnboardingEvent(**event_body) + Onboarder = OnboarderFactory.create_onboarder(validated_event.system) + onboarder = Onboarder( + fileuri=validated_event.s3_uri, + format=validated_event.format, + sheet_name=validated_event.sheet_name + ) + + logger.info("Transforming data") onboarder.transform() - logger.info("Writing data for record with body: %s", event_body) + logger.info("Writing data") onboarder.write() except Exception as e: logger.error(f"Failed to process record: {e}") diff --git a/backend/onboarders/mappings/parity/as_built_floor_classifiers.py b/backend/onboarders/mappings/parity/as_built_floor_classifiers.py index 05894e61..3af3c079 100644 --- a/backend/onboarders/mappings/parity/as_built_floor_classifiers.py +++ b/backend/onboarders/mappings/parity/as_built_floor_classifiers.py @@ -56,4 +56,5 @@ as_built_floor_classifiers = { unknown_as_built_floor_classifiers = { "RetroFitted": unknown_floor_retrofitted, "AsBuilt": unknown_floor_as_built, + "Unknown": unknown_floor_as_built, } diff --git a/backend/onboarders/mappings/parity/as_built_roof_classifiers.py b/backend/onboarders/mappings/parity/as_built_roof_classifiers.py index d5c883ba..fcb554bd 100644 --- a/backend/onboarders/mappings/parity/as_built_roof_classifiers.py +++ b/backend/onboarders/mappings/parity/as_built_roof_classifiers.py @@ -1,4 +1,5 @@ -from backend.onboarders.epc_descriptions import EpcConstructionAgeBand, EpcRoofDescriptions +from datatypes.epc.roof import EpcRoofDescriptions +from datatypes.epc.construction_age_band import EpcConstructionAgeBand def map_flat_roof(age_band: EpcConstructionAgeBand) -> EpcRoofDescriptions: diff --git a/backend/onboarders/mappings/parity/as_built_wall_classifiers.py b/backend/onboarders/mappings/parity/as_built_wall_classifiers.py index 124270c7..480a7e24 100644 --- a/backend/onboarders/mappings/parity/as_built_wall_classifiers.py +++ b/backend/onboarders/mappings/parity/as_built_wall_classifiers.py @@ -1,4 +1,5 @@ -from backend.onboarders.epc_descriptions import EpcConstructionAgeBand, EpcWallDescriptions +from datatypes.epc.construction_age_band import EpcConstructionAgeBand +from datatypes.epc.walls import EpcWallDescriptions def map_cavity_wall_insulation(age_band: EpcConstructionAgeBand): diff --git a/backend/onboarders/parity.py b/backend/onboarders/parity.py index 8fc5496e..2afc7a73 100644 --- a/backend/onboarders/parity.py +++ b/backend/onboarders/parity.py @@ -30,12 +30,13 @@ class ParityOnboarder(OnboarderBase): def __init__( self, fileuri: str, + **kwargs ): # Extract bucket, and filekey; Will be in the format s3://bucket/key bucket_name = fileuri.split("/")[2] file_name = "/".join(fileuri.split("/")[3:]) - self.read_s3(bucket_name=bucket_name, file_name=file_name) + self.read_s3(bucket_name=bucket_name, file_name=file_name, **kwargs) pass def map_construction_age_band(self): @@ -61,20 +62,20 @@ class ParityOnboarder(OnboarderBase): type and age band """ # Already resolved via direct mapping - if row.landlord_wall_description is not None: - return row.landlord_wall_description + if row.landlord_wall_construction is not None: + return row.landlord_wall_construction wall_type = row["Wall Construction"] # Missing construction age → conservative fallback - if pd.isnull(row.construction_age_band): + if pd.isnull(row.landlord_construction_age_band): return wall_unknown_age_fallback.get(wall_type) classifier = as_built_wall_classifiers.get(wall_type) if classifier is None: return None - return classifier(row.construction_age_band) + return classifier(row.landlord_construction_age_band) @staticmethod def _resolve_wall_efficiency( @@ -113,8 +114,8 @@ class ParityOnboarder(OnboarderBase): self.data[self.landlord_wall_efficiency] = self.data.progress_apply( lambda row: self._resolve_wall_efficiency( - row.landlord_wall_description, - row.construction_age_band, + row.landlord_wall_construction, + row.landlord_construction_age_band, ), axis=1, ) @@ -124,8 +125,8 @@ class ParityOnboarder(OnboarderBase): @staticmethod def _fill_roof_as_built(row: pd.Series) -> EpcRoofDescriptions | None: # Already resolved - if not pd.isnull(row.landlord_roof_description): - return row.landlord_roof_description + if not pd.isnull(row.landlord_roof_construction): + return row.landlord_roof_construction roof_type = row["Roof Construction"] @@ -133,10 +134,10 @@ class ParityOnboarder(OnboarderBase): if classifier is None: raise NotImplementedError(f"No roof classifier for roof type '{roof_type}'") - if pd.isnull(row.construction_age_band): + if pd.isnull(row.landlord_construction_age_band): return roof_unknown_age_fallback.get(roof_type) - output = classifier(row.construction_age_band) + output = classifier(row.landlord_construction_age_band) if output is None: raise NotImplementedError( f"Roof classification returned None for roof type '{roof_type}'" @@ -180,8 +181,8 @@ class ParityOnboarder(OnboarderBase): self.data[self.landlord_roof_efficiency] = self.data.progress_apply( lambda row: resolve_roof_efficiency( - description=row.landlord_roof_description, - age_band=row.construction_age_band, + description=row.landlord_roof_construction, + age_band=row.landlord_construction_age_band, insulation_thickness=row.roof_insulation_thickness_mm, ), axis=1, @@ -190,17 +191,17 @@ class ParityOnboarder(OnboarderBase): self.assert_no_nulls(self.data, self.landlord_roof_efficiency) # Flag sloping ceiling - data[self.landlord_has_sloping_ceiling] = data["Roof Construction"].apply( + self.data[self.landlord_has_sloping_ceiling] = self.data["Roof Construction"].apply( lambda x: x == "PitchedWithSlopingCeiling" ) @staticmethod def _fill_floor_as_built(row: pd.Series): # 1. Already resolved - if row.landlord_floor_description is not None: - return row.landlord_floor_description + if row.landlord_floor_construction is not None: + return row.landlord_floor_construction - age_band = row.construction_age_band + age_band = row.landlord_construction_age_band floor_type = row["Floor Construction"] insulation = row["Floor Insulation"] @@ -281,6 +282,48 @@ class ParityOnboarder(OnboarderBase): columns={"Total Floor Area (m2)": self.landlord_total_floor_area_m2} ) + def select_columns(self): + self.data = self.data[ + [ + "Org Ref", + "UPRN", + "Address 1", + "Address 2", + "Address 3", + "Postcode", + self.landlord_total_floor_area_m2, + self.landlord_construction_age_band, + self.landlord_property_type, + self.landlord_built_form, + self.landlord_wall_construction, + self.landlord_wall_efficiency, + self.landlord_roof_construction, + self.landlord_roof_efficiency, + self.landlord_has_sloping_ceiling, + self.landlord_floor_construction, + self.landlord_windows_construction, + self.landlord_windows_efficiency, + self.landlord_multi_glaze_proportion, + self.landlord_glazed_type, + self.landlord_glazed_area, + self.landlord_heating_construction, + self.landlord_heating_efficiency, + self.landlord_fuel_construction, + self.landlord_heating_controls_construction, + self.landlord_heating_controls_efficiency, + self.landlord_hot_water_system_construction, + self.landlord_hot_water_efficiency + ] + ].rename( + columns={ + "Org Ref": "landlord_property_id", + "Address1": "address1", + "Address2": "address2", + "Address3": "address3", + "Postcode": "postcode", + } + ) + def transform(self): # ------------ construction_age_band ------------ self.map_construction_age_band() @@ -308,3 +351,6 @@ class ParityOnboarder(OnboarderBase): # ------------ Floor Area ------------ self.map_floor_area() + + # ------------ Formating ------------ + self.select_columns() diff --git a/backend/onboarders/requirements.txt b/backend/onboarders/requirements.txt new file mode 100644 index 00000000..e69de29b