mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
118 lines
3.6 KiB
Python
118 lines
3.6 KiB
Python
import os
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
import geopandas as gpd
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class OpenUprnClient:
|
|
"""
|
|
|
|
This client reads in the Open UPRN data from s3 which can be downloaded from here:
|
|
https://osdatahub.os.uk/downloads/open/OpenUPRN
|
|
|
|
This dataset contains a lookup of UPRNs to coordinates.
|
|
|
|
Specs for this dataset can be found here:
|
|
https://www.ordnancesurvey.co.uk/documents/product-support/tech-spec/open-uprn-techspec-v1.pdf
|
|
"""
|
|
|
|
def __init__(self, path, bucket, uprns=None):
|
|
self.path = path
|
|
self.bucket = bucket
|
|
self.uprns = [int(x) for x in uprns] if uprns else None
|
|
self.data = None
|
|
|
|
# This will be stored in S3 and will be the complete list of filenames
|
|
# We'll then use this to determine which file the UPRN's data is contained in
|
|
self.filenames = None
|
|
|
|
def read(self):
|
|
"""
|
|
This methodology is placeholder, while data sits localls
|
|
:return:
|
|
"""
|
|
logger.info("Reading in open uprn data")
|
|
|
|
df = pd.read_csv(
|
|
read_io_from_s3(
|
|
bucket_name=self.bucket,
|
|
file_key=self.path
|
|
)
|
|
)
|
|
if self.uprns:
|
|
df = df[df["UPRN"].isin(self.uprns)]
|
|
|
|
self.data = df
|
|
|
|
def read_local(self):
|
|
"""
|
|
For local testing
|
|
:return:
|
|
"""
|
|
logger.info("Reading in open uprn data")
|
|
|
|
df = pd.read_csv(self.path)
|
|
if self.uprns:
|
|
df = df[df["UPRN"].isin(self.uprns)]
|
|
|
|
self.data = df
|
|
|
|
def create_file_partitions(self, partition_size=50000):
|
|
logger.info("Sorting data by UPRN ascending")
|
|
self.data = self.data.sort_values("UPRN", ascending=True)
|
|
|
|
logger.info("Creating partitions")
|
|
self.data['partition'] = self.data.index // partition_size
|
|
|
|
self.filenames = {}
|
|
for partition, group in tqdm(self.data.groupby('partition')):
|
|
min_uprn = group['UPRN'].min()
|
|
max_uprn = group['UPRN'].max()
|
|
self.filenames[partition] = f"{min_uprn}_{max_uprn}.parquet"
|
|
|
|
self.data['filename'] = self.data['partition'].map(self.filenames)
|
|
|
|
@staticmethod
|
|
def find_filename_for_uprn(uprn, filenames):
|
|
for filename in filenames:
|
|
min_uprn, max_uprn = map(int, filename.replace(".parquet", "").split("_"))
|
|
if min_uprn <= uprn <= max_uprn:
|
|
return filename
|
|
return None
|
|
|
|
@staticmethod
|
|
def convert_bng_data_to_gpd(df):
|
|
|
|
gpd_data = gpd.GeoDataFrame(
|
|
df,
|
|
geometry=gpd.points_from_xy(df.X_COORDINATE, df.Y_COORDINATE),
|
|
crs="EPSG:27700" # British National Grid
|
|
)
|
|
|
|
return gpd_data
|
|
|
|
def save_filenames_to_s3(self, bucket_name):
|
|
"""
|
|
Save the filenames to s3
|
|
:param bucket_name:
|
|
:return:
|
|
"""
|
|
file_key = os.path.join("spatial", "filename_meta.parquet")
|
|
|
|
filenames = pd.DataFrame({"filenames": list(self.filenames.values())})
|
|
filenames[['lower', 'upper']] = filenames['filenames'].str.replace('.parquet', '').str.extract(
|
|
'(\d+)_(\d+)'
|
|
)
|
|
filenames['lower'] = filenames['lower'].astype(int)
|
|
filenames['upper'] = filenames['upper'].astype(int)
|
|
|
|
logger.info("Saving filenames to s3 at {}".format(file_key))
|
|
save_dataframe_to_s3_parquet(
|
|
df=filenames,
|
|
file_key=file_key,
|
|
bucket_name=bucket_name
|
|
)
|