mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Optionally inject uprn lookup into parser 🟩
This commit is contained in:
parent
2a908491b3
commit
cd24804ca2
6 changed files with 68 additions and 42 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from pathlib import Path
|
||||
|
||||
from backend.condition.lookups.uprn_lookup_csv import UprnLookupLocal
|
||||
from backend.condition.processor import process_file
|
||||
|
||||
|
||||
|
|
@ -20,15 +21,19 @@ def main() -> None:
|
|||
/ "peabody"
|
||||
/ "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx"
|
||||
)
|
||||
filepaths = [lbwf_path, peabody_path]
|
||||
peabody_uprn_lookup_path: Path = (
|
||||
path / "peabody" / "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
|
||||
)
|
||||
# filepaths = [lbwf_path, peabody_path]
|
||||
# filepaths = [lbwf_path]
|
||||
# filepaths = [peabody_path]
|
||||
filepaths = [peabody_path]
|
||||
|
||||
uprn_lookup = UprnLookupLocal(csv_path=peabody_uprn_lookup_path.as_posix())
|
||||
|
||||
for fp in filepaths:
|
||||
with fp.open("rb") as f:
|
||||
process_file(
|
||||
file_stream=f,
|
||||
source_key=fp.as_posix(),
|
||||
file_stream=f, source_key=fp.as_posix(), uprn_lookup=uprn_lookup
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,18 +1,26 @@
|
|||
from typing import Optional
|
||||
from backend.condition.domain.mapping.lbwf.lbwf_mapper import LbwfMapper
|
||||
from backend.condition.domain.mapping.mapper import Mapper
|
||||
from backend.condition.domain.mapping.peabody.peabody_mapper import PeabodyMapper
|
||||
from backend.condition.file_type import FileType
|
||||
from backend.condition.lookups.uprn_lookup import UprnLookup
|
||||
from backend.condition.parsing.parser import Parser
|
||||
from backend.condition.parsing.lbwf_parser import LbwfParser
|
||||
from backend.condition.parsing.peabody_parser import PeabodyParser
|
||||
|
||||
|
||||
def select_parser(file_type: FileType) -> Parser:
|
||||
def select_parser(
|
||||
file_type: FileType, uprn_lookup: Optional[UprnLookup] = None
|
||||
) -> Parser:
|
||||
if file_type is FileType.LBWF:
|
||||
return LbwfParser()
|
||||
|
||||
if file_type is FileType.Peabody:
|
||||
return PeabodyParser()
|
||||
if not uprn_lookup:
|
||||
raise ValueError(
|
||||
"Cannot instantiate Peabody Parser without UPRN lookup being provided"
|
||||
)
|
||||
return PeabodyParser(uprn_lookup=uprn_lookup)
|
||||
|
||||
raise ValueError("Unrecognised file type, unable to instantiate Parser")
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
|
|||
from openpyxl import Workbook, load_workbook
|
||||
from collections import defaultdict
|
||||
|
||||
from backend.condition.lookups.uprn_lookup import UprnLookup
|
||||
from backend.condition.parsing.parser import Parser
|
||||
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
|
||||
PeabodyAssetCondition,
|
||||
|
|
@ -15,41 +16,39 @@ logger = setup_logger()
|
|||
|
||||
|
||||
class PeabodyParser(Parser):
|
||||
def __init__(self, uprn_lookup: UprnLookup):
|
||||
self.uprn_lookup: UprnLookup = uprn_lookup # TODO: move this to the ABC?
|
||||
|
||||
def parse(
|
||||
self,
|
||||
file_stream: BinaryIO,
|
||||
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
|
||||
) -> Any:
|
||||
wb: Workbook = load_workbook(file_stream)
|
||||
|
||||
if location_ref_to_uprn_map is None:
|
||||
location_ref_to_uprn_map: Dict[str, int] = (
|
||||
PeabodyParser._build_location_ref_to_uprn_map()
|
||||
)
|
||||
|
||||
assets = PeabodyParser._parse_assets(wb)
|
||||
|
||||
location_ref_to_uprn_map = self.uprn_lookup.get_property_ref_to_uprn_lookup()
|
||||
|
||||
return PeabodyParser._group_assets_into_properties(
|
||||
assets=assets,
|
||||
location_ref_to_uprn_map=location_ref_to_uprn_map,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_location_ref_to_uprn_map() -> Dict[str, int]:
|
||||
location_ref_to_uprn_filepath: Path = (
|
||||
Path(__file__).resolve().parents[1]
|
||||
/ "sample_data"
|
||||
/ "peabody"
|
||||
/ "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
|
||||
)
|
||||
location_ref_to_uprn_map: Dict[str, int] = {}
|
||||
# @staticmethod
|
||||
# def _build_location_ref_to_uprn_map() -> Dict[str, int]:
|
||||
# location_ref_to_uprn_filepath: Path = (
|
||||
# Path(__file__).resolve().parents[1]
|
||||
# / "sample_data"
|
||||
# / "peabody"
|
||||
# / "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
|
||||
# ) # TODO: get this to work with lambda - include file in docker image for now?
|
||||
# location_ref_to_uprn_map: Dict[str, int] = {}
|
||||
|
||||
with location_ref_to_uprn_filepath.open(newline="") as f:
|
||||
reader: Any = csv.DictReader(f)
|
||||
for row in reader:
|
||||
location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
|
||||
# with location_ref_to_uprn_filepath.open(newline="") as f:
|
||||
# reader: Any = csv.DictReader(f)
|
||||
# for row in reader:
|
||||
# location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
|
||||
|
||||
return location_ref_to_uprn_map
|
||||
# return location_ref_to_uprn_map
|
||||
|
||||
@staticmethod
|
||||
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from typing import Any, BinaryIO, List
|
||||
from datetime import datetime
|
||||
|
||||
from backend.condition.lookups.uprn_lookup import UprnLookup
|
||||
from utils.logger import setup_logger
|
||||
from backend.condition.domain.mapping.mapper import Mapper
|
||||
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
|
||||
|
|
@ -12,12 +13,14 @@ from backend.condition.parsing.factory import select_parser, select_mapper
|
|||
logger = setup_logger()
|
||||
|
||||
|
||||
def process_file(file_stream: BinaryIO, source_key: str) -> None:
|
||||
def process_file(
|
||||
file_stream: BinaryIO, source_key: str, uprn_lookup: UprnLookup
|
||||
) -> None:
|
||||
logger.info(f"[processor] Received file: {source_key}")
|
||||
|
||||
# Instantiation
|
||||
file_type: FileType = detect_file_type(source_key)
|
||||
parser: Parser = select_parser(file_type)
|
||||
parser: Parser = select_parser(file_type, uprn_lookup)
|
||||
mapper: Mapper = select_mapper(file_type)
|
||||
persistence = ConditionPostgres()
|
||||
|
||||
|
|
@ -41,6 +44,6 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
|
|||
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
|
||||
)
|
||||
|
||||
persistence.bulk_insert_surveys(property_condition_surveys)
|
||||
# persistence.bulk_insert_surveys(property_condition_surveys)
|
||||
|
||||
logger.info(f"[processor] Finished loading surveys to database")
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
import pytest
|
||||
|
||||
from backend.condition.lookups.uprn_lookup_csv import UprnLookupLocal
|
||||
from backend.condition.parsing.factory import select_parser
|
||||
from backend.condition.file_type import FileType
|
||||
|
||||
|
||||
def test_selects_lbwf_parser():
|
||||
# arrange
|
||||
file_type = FileType.LBWF
|
||||
|
|
@ -14,13 +16,15 @@ def test_selects_lbwf_parser():
|
|||
# assert
|
||||
assert expected_class_name == actual_class_name
|
||||
|
||||
|
||||
def test_selects_peabody_parser():
|
||||
# arrange
|
||||
file_type = FileType.Peabody
|
||||
expected_class_name = "PeabodyParser"
|
||||
uprn_lookup = UprnLookupLocal(csv_path="test")
|
||||
|
||||
# act
|
||||
actual_class_name = select_parser(file_type).__class__.__name__
|
||||
actual_class_name = select_parser(file_type, uprn_lookup).__class__.__name__
|
||||
|
||||
# assert
|
||||
assert expected_class_name == actual_class_name
|
||||
assert expected_class_name == actual_class_name
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
from tempfile import NamedTemporaryFile
|
||||
import pytest
|
||||
from typing import Any, Dict
|
||||
from io import BytesIO
|
||||
from openpyxl import Workbook
|
||||
from datetime import datetime
|
||||
|
||||
from backend.condition.lookups.uprn_lookup_csv import UprnLookupLocal
|
||||
from backend.condition.parsing.peabody_parser import PeabodyParser
|
||||
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
|
||||
PeabodyAssetCondition,
|
||||
|
|
@ -145,23 +147,28 @@ def peabody_assets_xlsx_bytes() -> BytesIO:
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def location_ref_to_uprn_map() -> Dict[str, int]:
|
||||
return {
|
||||
"B000RAND": 1,
|
||||
"B000BLOCK": 2,
|
||||
"B000FAKE": 3,
|
||||
"B000MIS": 4,
|
||||
}
|
||||
def prop_ref_uprn_csv_file() -> str:
|
||||
csv_content = """reference,out_uprn
|
||||
B000RAND,1
|
||||
B000BLOCK,2
|
||||
B000FAKE,3
|
||||
B000MIS,4
|
||||
"""
|
||||
with NamedTemporaryFile(mode="w+", delete=False, suffix=".csv") as tmp:
|
||||
tmp.write(csv_content)
|
||||
tmp.flush()
|
||||
return tmp.name
|
||||
|
||||
|
||||
def test_peabody_parser_parses_conditions(
|
||||
peabody_assets_xlsx_bytes, location_ref_to_uprn_map
|
||||
peabody_assets_xlsx_bytes, prop_ref_uprn_csv_file
|
||||
):
|
||||
# arrange
|
||||
parser = PeabodyParser()
|
||||
uprn_lookup = UprnLookupLocal(csv_path=prop_ref_uprn_csv_file)
|
||||
parser = PeabodyParser(uprn_lookup=uprn_lookup)
|
||||
|
||||
# act
|
||||
result: Any = parser.parse(peabody_assets_xlsx_bytes, location_ref_to_uprn_map)
|
||||
result: Any = parser.parse(peabody_assets_xlsx_bytes)
|
||||
|
||||
# assert
|
||||
assert len(result) == 3
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue