Added address formatting

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-02 14:27:54 +00:00
parent 40b7ec1c18
commit 19951b9ca1
3 changed files with 83 additions and 32 deletions

View file

@ -25,6 +25,28 @@ class OrdnanceSuveyClient:
self.most_relevant_result = None
self.property_type = None
self.built_form = None
# This will be postcode and address, as returned by the ordnance survey
self.address_os = None
self.postcode_os = None
def set_places_address(self):
"""
Given a response from the places api, this function will set the address and postcode of the property
"""
if self.most_relevant_result is None:
raise ValueError("No results found - run get_places_api first")
self.address_os = self.most_relevant_result["ADDRESS"]
self.postcode_os = self.most_relevant_result["POSTCODE"]
# We strip out the postcode from the address as this is already stored separately
self.address_os = self.address_os.replace(self.postcode_os, "").strip()
# Remove trailing comma
self.address_os = self.address_os.rstrip(",").strip()
# Convert to title case
self.address_os = self.address_os.title()
# Make sure postcode is upper case
self.postcode_os = self.postcode_os.upper()
@lru_cache(maxsize=128)
def get_places_api(self):
@ -47,17 +69,15 @@ class OrdnanceSuveyClient:
# Extract some details about the best match
self.most_relevant_result = self.results[0]["DPA"]
self.property_type, self.built_form = self.parse_classification_code(
self.most_relevant_result["CLASSIFICATION_CODE"]
)
self.parse_classification_code(self.most_relevant_result["CLASSIFICATION_CODE"])
self.set_places_address()
return
else:
logger.info("Could not find any results for the provided address and postcode")
return {"status": response.status_code}
@staticmethod
def parse_classification_code(classification_code: str):
def parse_classification_code(self, classification_code: str):
"""
This function will convert the classification code, returned by the OS places api, to a property type that is
compatible with the EPC database.
@ -81,7 +101,5 @@ class OrdnanceSuveyClient:
}
mapped = value_map.get(classification_code, {})
property_type = mapped.get("property_type", "")
built_form = mapped.get("built_form", "")
return property_type, built_form
self.property_type = mapped.get("property_type", "")
self.built_form = mapped.get("built_form", "")

View file

@ -177,6 +177,10 @@ class SearchEpc:
self.older_epcs = None
self.full_sap_epc = None
# These are the address and postcode values, which we store in the database
self.address_clean = None
self.postcode_clean = None
self.size = size if size is not None else 25
@classmethod
@ -239,7 +243,7 @@ class SearchEpc:
# We use the direct call method inside, since we need to implement uprn as a valid
# parameter for the search function
url = os.path.join(self.client.domestic.host, "search")
response = self.client.domestic.call(method="get", url=url, params=params, size=size)
response = self.client.domestic.call(method="get", url=url, params=params)
else:
response = self.client.domestic.search(params=params, size=size)
@ -312,7 +316,24 @@ class SearchEpc:
return rows
def retrieve(self, property_type=None, address=None):
@staticmethod
def format_address(newest_epc):
"""
Format address and postcode for storage in the database
"""
postcode = newest_epc["postcode"]
address = newest_epc["address"]
# Format them
address = address.replace(postcode, "").strip()
address = address.rstrip(",").strip()
address = address.title()
postcode = postcode.upper()
return address, postcode
def extract_epc_data(self, property_type=None, address=None):
"""
Given a successful search, this method will format the data and return it
@ -338,7 +359,10 @@ class SearchEpc:
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
return newest_epc, older_epcs, full_sap_epc
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -381,6 +405,10 @@ class SearchEpc:
Note - do we have postcodes with just a single address? We would need to use a different approach
to find the closest homes
:param property_type: This is the property type of the property we are estimating, that can be retrieved from
the ordnance survey api
:param built_form: This is the built form of the property we are estimating, that can be retrieved from
the ordnance survey api
:return:
"""
@ -400,14 +428,14 @@ class SearchEpc:
# For each record, parse the house number. We'll use this to identify the closest properties
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
# We convert the house number fo a purely numeric format - therefore
# We convert the house number fo a purely numeric format - This numeric house number will be used as
# a distance weight when estimating the EPC
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
lambda house_num: self.extract_numeric_housenumber_part(house_num)
)
epc_data["house_number_distance"] = abs(
epc_data["numeric_house_number"] - self.numeric_house_number
)
epc_data["house_number_distance"] = abs(epc_data["numeric_house_number"] - self.numeric_house_number)
epc_data["weight"] = 1 / epc_data["house_number_distance"]
epc_built_form = self._get_epc_mode(col="built-form", epc_data=epc_data)
epc_property_type = self._get_epc_mode(col="property-type", epc_data=epc_data)
@ -428,13 +456,8 @@ class SearchEpc:
(epc_data["built-form"] == estimation_built_form) & (epc_data["property-type"] == estimation_property_type)
]
epc_data["weight"] = 1 / epc_data["house_number_distance"]
# We use house_number_distance as a weighting where closer homes are upweighted when interpolating
# For each attribute, we need to determine the datatype and use an appropriate method
# to interpolate.
# to estimate.
estimated_epc = {}
for key, vartype in vartypes.items():
epc_data[key] = np.where(pd.isnull(epc_data[key]), None, epc_data[key])
@ -498,7 +521,9 @@ class SearchEpc:
response = self.get_epc()
if response["status"] == 200:
self.newest_epc, self.older_epcs, self.full_sap_epc = self.retrieve(address=self.full_address)
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean
) = self.extract_epc_data(address=self.full_address)
return
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
@ -509,11 +534,24 @@ class SearchEpc:
raise Exception("Unable to find property - investigate me")
# Step 3: Now that we have a urpn, do another check against the epc api, this time searching with the uprn
self.uprn = self.ordnance_survey_client.results[0]["DPA"]["UPRN"]
self.uprn = self.ordnance_survey_client.most_relevant_result["UPRN"]
response = self.get_epc()
if response["status"] == 200:
self.newest_epc, self.older_epcs, self.full_sap_epc = self.retrieve(address=self.full_address)
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean
) = self.extract_epc_data(address=self.ordnance_survey_client.most_relevant_result["ADDRESS"])
return
# Step 4: If we still don't have an EPC, we estimate the EPC data
estimated_epc = self.estimate_epc()
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form
)
self.newest_epc = estimated_epc
self.older_epcs = []
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = self.ordnance_survey_client.address_os
self.postcode_clean = self.ordnance_survey_client.postcode_os
return

View file

@ -2,7 +2,6 @@ from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
@ -60,7 +59,6 @@ async def trigger_plan(body: PlanTriggerRequest):
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
@ -73,8 +71,6 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation. We should also standardise postcode and address in some fashion as
# a postcode of abcdef would be considered different to ABCDEF
epc_searcher = SearchEpc(
address1=config["address"],
@ -91,7 +87,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
session,