Model/backend/condition/lookups/uprn_lookup_s3.py
2026-02-04 15:14:26 +00:00

29 lines
1,004 B
Python

import csv
from io import BytesIO, TextIOWrapper
from typing import BinaryIO, Dict, TextIO
from backend.condition.lookups.uprn_lookup import UprnLookup
from utils.s3 import read_io_from_s3
class UprnLookupS3(UprnLookup):
def __init__(self, bucket: str = "", key: str = ""):
self.bucket = bucket
self.key = key
def get_property_ref_to_uprn_lookup(self) -> Dict[str, int]:
file_bytes: BytesIO = read_io_from_s3(
bucket_name=self.bucket, file_key=self.key
)
return self._parse_csv_bytes(file_bytes)
def _parse_csv_bytes(self, file_stream: BinaryIO) -> Dict[str, int]:
text_stream: TextIO = TextIOWrapper(file_stream, encoding="utf-8")
mapping: Dict[str, int] = {}
reader = csv.DictReader(text_stream)
for row in reader:
if not row["reference"] or not row["out_uprn"]:
continue
mapping[row["reference"].strip()] = int(row["out_uprn"].strip())
return mapping