Model/etl/spatial/OpenUprnClient.py
Khalim Conn-Kowlessar f94cbd4385 added spatial readme
2023-10-05 14:27:00 +01:00

118 lines
3.6 KiB
Python

import os
from tqdm import tqdm
import pandas as pd
import geopandas as gpd
from utils.logger import setup_logger
from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet
logger = setup_logger()
class OpenUprnClient:
"""
This client reads in the Open UPRN data from s3 which can be downloaded from here:
https://osdatahub.os.uk/downloads/open/OpenUPRN
This dataset contains a lookup of UPRNs to coordinates.
Specs for this dataset can be found here:
https://www.ordnancesurvey.co.uk/documents/product-support/tech-spec/open-uprn-techspec-v1.pdf
"""
def __init__(self, path, bucket, uprns=None):
self.path = path
self.bucket = bucket
self.uprns = [int(x) for x in uprns] if uprns else None
self.data = None
# This will be stored in S3 and will be the complete list of filenames
# We'll then use this to determine which file the UPRN's data is contained in
self.filenames = None
def read(self):
"""
This methodology is placeholder, while data sits localls
:return:
"""
logger.info("Reading in open uprn data")
df = pd.read_csv(
read_io_from_s3(
bucket_name=self.bucket,
file_key=self.path
)
)
if self.uprns:
df = df[df["UPRN"].isin(self.uprns)]
self.data = df
def read_local(self):
"""
For local testing
:return:
"""
logger.info("Reading in open uprn data")
df = pd.read_csv(self.path)
if self.uprns:
df = df[df["UPRN"].isin(self.uprns)]
self.data = df
def create_file_partitions(self, partition_size=50000):
logger.info("Sorting data by UPRN ascending")
self.data = self.data.sort_values("UPRN", ascending=True)
logger.info("Creating partitions")
self.data['partition'] = self.data.index // partition_size
self.filenames = {}
for partition, group in tqdm(self.data.groupby('partition')):
min_uprn = group['UPRN'].min()
max_uprn = group['UPRN'].max()
self.filenames[partition] = f"{min_uprn}_{max_uprn}.parquet"
self.data['filename'] = self.data['partition'].map(self.filenames)
@staticmethod
def find_filename_for_uprn(uprn, filenames):
for filename in filenames:
min_uprn, max_uprn = map(int, filename.replace(".parquet", "").split("_"))
if min_uprn <= uprn <= max_uprn:
return filename
return None
@staticmethod
def convert_bng_data_to_gpd(df):
gpd_data = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df.X_COORDINATE, df.Y_COORDINATE),
crs="EPSG:27700" # British National Grid
)
return gpd_data
def save_filenames_to_s3(self, bucket_name):
"""
Save the filenames to s3
:param bucket_name:
:return:
"""
file_key = os.path.join("spatial", "filename_meta.parquet")
filenames = pd.DataFrame({"filenames": list(self.filenames.values())})
filenames[['lower', 'upper']] = filenames['filenames'].str.replace('.parquet', '').str.extract(
'(\d+)_(\d+)'
)
filenames['lower'] = filenames['lower'].astype(int)
filenames['upper'] = filenames['upper'].astype(int)
logger.info("Saving filenames to s3 at {}".format(file_key))
save_dataframe_to_s3_parquet(
df=filenames,
file_key=file_key,
bucket_name=bucket_name
)