import logging import numpy as np import pandas as pd from backend.addresses.Addresses import Addresses from backend.app.config import get_settings from utils.s3 import read_csv_from_s3, read_excel_from_s3 class PlanInputProcessor: def __init__(self, body): self.body = body self.logger = logging.getLogger(__name__) self.plan_input = None self.valuation_data = [] self.index_start = getattr(body, 'index_start', None) self.index_end = getattr(body, 'index_end', None) def process(self): if self.body.file_type == "xlsx": self.logger.info("Getting the plan input") self.plan_input = read_excel_from_s3( bucket_name=get_settings().PLAN_TRIGGER_BUCKET, file_key=self.body.trigger_file_path, sheet_name=self.body.sheet_name, header_row=0, ) self.logger.info("Got the plan input from excel") if self.body.file_format == "domna_asset_list": self._process_domna_asset_list() elif self.body.file_format == "ara_property_list": self._process_ara_property_list() else: raise ValueError("Other formats not yet supported") else: self.logger.info("Getting the plan input from csv") self.plan_input = read_csv_from_s3( bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=self.body.trigger_file_path ) self.logger.info("Got the plan input from csv") # Slice if needed if self.index_start is not None and self.index_end is not None: self.plan_input = self.plan_input[self.index_start:self.index_end] # Extract valuation data if present self._extract_valuation_data() return self.to_addresses() def _extract_valuation_data(self): # Only for domna_asset_list, extract domna_valuation if present if self.body.file_format == "domna_asset_list" and self.plan_input: first = self.plan_input[0] if "domna_valuation" in first: self.valuation_data = [ {"uprn": x.get("uprn"), "valuation": x.get("domna_valuation")} for x in self.plan_input if x.get("domna_valuation") is not None ] # Could add more formats here in future def _process_domna_asset_list(self): df = self.plan_input df = df.rename( columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"} ) if "estimated" not in df.columns: df["estimated"] = False df["uprn"] = np.where( df["estimated"].isin([1, True]) & ((df["uprn"] < 0) | pd.isnull(df["uprn"])), None, df["uprn"] ) df["property_type"] = df["landlord_property_type"].copy() if "landlord_built_form" in df.columns: df["built_form"] = df["landlord_built_form"].copy() else: df["built_form"] = None if "epc_property_type" not in df.columns: df["epc_property_type"] = None df["property_type"] = np.where( df["property_type"] == "unknown", df["epc_property_type"], df["property_type"] ) if "epc_archetype" not in df.columns: df["epc_archetype"] = None df["built_form"] = np.where( df["built_form"] == "unknown", df["epc_archetype"], df["built_form"] ) property_type_map = { "house": "House", "flat": "Flat", "maisonette": "Maisonette", "bungalow": "Bungalow", "block house": "House", "coach house": "House", "bedsit": "Flat", } built_form_map = { "mid-terrace": "Mid-Terrace", "end-terrace": "End-Terrace", "semi-detached": "Semi-Detached", "detached": "Detached", "enclosed end-terrace": "Enclosed End-Terrace", "enclosed mid-terrace": "Enclosed Mid-Terrace", } df["property_type"] = df["property_type"].map(property_type_map).fillna(df["property_type"]) df["built_form"] = df["built_form"].map(built_form_map).fillna(df["built_form"]) self.plan_input = df.to_dict("records") def _process_ara_property_list(self): df = self.plan_input self.plan_input = df.to_dict("records") def to_addresses(self): return Addresses.from_plan_input(self.plan_input, self.body)