Model/etl/spatial/OpenUprnClient.py
Khalim Conn-Kowlessar 802da66ce9 fixing engine api
2025-07-22 17:05:03 +01:00

205 lines
7.1 KiB
Python

import os
from tqdm import tqdm
import pandas as pd
from utils.logger import setup_logger
from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
from backend.Property import Property
from backend.SearchEpc import SearchEpc
logger = setup_logger()
class OpenUprnClient:
"""
This client reads in the Open UPRN data from s3 which can be downloaded from here:
https://osdatahub.os.uk/downloads/open/OpenUPRN
This dataset contains a lookup of UPRNs to coordinates.
Specs for this dataset can be found here:
https://www.ordnancesurvey.co.uk/documents/product-support/tech-spec/open-uprn-techspec-v1.pdf
"""
def __init__(self, path, bucket, uprns=None):
self.path = path
self.bucket = bucket
self.uprns = [int(x) for x in uprns] if uprns else None
self.data = None
# This will be stored in S3 and will be the complete list of filenames
# We'll then use this to determine which file the UPRN's data is contained in
self.filenames = None
def read(self):
"""
This methodology is placeholder, while data sits localls
:return:
"""
logger.info("Reading in open uprn data")
df = pd.read_csv(
read_io_from_s3(
bucket_name=self.bucket,
file_key=self.path
)
)
if self.uprns:
df = df[df["UPRN"].isin(self.uprns)]
self.data = df
def read_local(self):
"""
For local testing
:return:
"""
logger.info("Reading in open uprn data")
df = pd.read_csv(self.path)
if self.uprns:
df = df[df["UPRN"].isin(self.uprns)]
self.data = df
def create_file_partitions(self, partition_size=50000):
logger.info("Sorting data by UPRN ascending")
self.data = self.data.sort_values("UPRN", ascending=True)
logger.info("Creating partitions")
self.data['partition'] = self.data.index // partition_size
self.filenames = {}
for partition, group in tqdm(self.data.groupby('partition')):
min_uprn = group['UPRN'].min()
max_uprn = group['UPRN'].max()
self.filenames[partition] = f"{min_uprn}_{max_uprn}.parquet"
self.data['filename'] = self.data['partition'].map(self.filenames)
@staticmethod
def find_filename_for_uprn(uprn, filenames):
for filename in filenames:
min_uprn, max_uprn = map(int, filename.replace(".parquet", "").split("_"))
if min_uprn <= uprn <= max_uprn:
return filename
return None
def save_filenames_to_s3(self, bucket_name):
"""
Save the filenames to s3
:param bucket_name:
:return:
"""
file_key = os.path.join("spatial", "filename_meta.parquet")
filenames = pd.DataFrame({"filenames": list(self.filenames.values())})
filenames[['lower', 'upper']] = filenames['filenames'].str.replace('.parquet', '').str.extract(
'(\d+)_(\d+)'
)
filenames['lower'] = filenames['lower'].astype(int)
filenames['upper'] = filenames['upper'].astype(int)
logger.info("Saving filenames to s3 at {}".format(file_key))
save_dataframe_to_s3_parquet(
df=filenames,
file_key=file_key,
bucket_name=bucket_name
)
@staticmethod
def make_uprn_map(uprns, uprn_filenames):
"""
Given a list of UPRNs, this method will return a map of the UPRN to the filename that the UPRN is contained in
:param uprns: List of UPRNs
:param uprn_filenames: Lookup from UPRN range to filename
:return:
"""
uprn_map = {}
for uprn in uprns:
filtered_df = uprn_filenames[
(uprn_filenames["lower"] <= int(uprn))
& (uprn_filenames["upper"] >= int(uprn))
]
if filtered_df["filenames"].values[0] in uprn_map:
uprn_map[filtered_df["filenames"].values[0]].append(int(uprn))
else:
uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)]
return uprn_map
@classmethod
def set_spatial_data(cls, input_properties: list[Property], bucket_name):
"""
Given a list of properties, this method will set the spatial data for each property
The method will look for the minimal set of uprn datasets that it needs to read in to get all of the spatial
data for the properties
"""
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
)
# If we have a domna asset list, we
uprns = [p.uprn for p in input_properties if p.uprn_source != SearchEpc.UPRN_SOURCE_SIMULATED]
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)):
# Read in the file
spatial_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}"
)
spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)]
for p in input_properties:
if p.uprn in associated_uprn:
p.set_spatial(spatial_df[spatial_df["UPRN"] == p.uprn])
if p.uprn_source == SearchEpc.UPRN_SOURCE_SIMULATED:
p.set_spatial(cls.empty_spatial_df())
# Perform a final check to ensure that all properties have spatial data
for p in input_properties:
if p.spatial is None:
raise Exception(f"Property with UPRN {p.uprn} does not have spatial data")
return input_properties
@staticmethod
def empty_spatial_df():
return pd.DataFrame(
[
{
"X_COORDINATE": None,
"Y_COORDINATE": None,
"LATITUDE": None,
"LONGITUDE": None,
"conservation_status": False,
"is_listed_building": False,
"is_heritage_building": False,
}
]
)
@classmethod
def get_spatial_data(cls, uprns: list[int], bucket_name):
"""
Similar method to set_spatial_data, but designed to work more generally on a list of uprns
:return:
"""
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
)
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
uprn_spatial_table = []
for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)):
# Read in the file
spatial_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}"
)
spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)]
uprn_spatial_table.append(spatial_df)
return pd.concat(uprn_spatial_table)