Model/backend/condition/lookups/uprn_lookup_csv.py
2026-02-04 11:51:40 +00:00

23 lines
836 B
Python

import csv
from io import TextIOWrapper
from typing import BinaryIO, Dict, TextIO
from backend.condition.lookups.uprn_lookup import UprnLookup
class UprnLookupLocal(UprnLookup):
def __init__(self, csv_path: str):
self.csv_path = csv_path
def get_property_ref_to_uprn_lookup(self) -> Dict[str, int]:
with open(self.csv_path, "rb") as f:
return self.parse_csv(f)
def parse_csv(self, file_stream: BinaryIO) -> Dict[str, int]:
text_stream: TextIO = TextIOWrapper(file_stream, encoding="utf-8")
mapping: Dict[str, int] = {}
reader = csv.DictReader(text_stream)
for row in reader:
if not row["reference"] or not row["out_uprn"]:
continue
mapping[row["reference"].strip()] = int(row["out_uprn"].strip())
return mapping