Merge pull request #695 from Hestia-Homes/feature/conditions-database-write

Feature/Condition - write data to postgres
This commit is contained in:
Daniel Roth 2026-02-03 15:53:45 +00:00 committed by GitHub
commit 429b18a5ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 677 additions and 187 deletions

View file

@ -9,10 +9,20 @@ services:
command: sleep infinity
volumes:
- ../../:/workspaces/model
networks:
- model-net
networks:
model-net:
driver: bridge
db:
image: postgres:17.4
restart: unless-stopped
ports:
- 5432:5432
environment:
- PGDATABASE=tech_team_local_db
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=makingwarmerhomes
volumes:
- postgres-data-two:/var/lib/postgresql/data
volumes:
postgres-data-two:

22
backend/.env.local Normal file
View file

@ -0,0 +1,22 @@
DB_HOST=db
DB_PORT=5432
DB_NAME=tech_team_local_db
DB_USERNAME=postgres
DB_PASSWORD=makingwarmerhomes
#not used
GOOGLE_SOLAR_API_KEY="test"
SAP_PREDICTIONS_BUCKET="test"
CARBON_PREDICTIONS_BUCKET="test"
HEAT_PREDICTIONS_BUCKET="test"
HEATING_KWH_PREDICTIONS_BUCKET="test"
HOTWATER_KWH_PREDICTIONS_BUCKET="test"
API_KEY="test"
ENVIRONMENT="test"
SECRET_KEY="test"
PLAN_TRIGGER_BUCKET="test"
DATA_BUCKET="test"
EPC_AUTH_TOKEN="test"
ENGINE_SQS_URL="test"
ENERGY_ASSESSMENTS_BUCKET="test"

View file

@ -42,7 +42,7 @@ class Settings(BaseSettings):
AWS_DEFAULT_REGION: Optional[str] = None
class Config:
env_file = "backend/.env"
env_file = "backend/.env.local"
@lru_cache()

View file

@ -3,7 +3,9 @@ from contextlib import contextmanager
from backend.app.config import get_settings
from sqlmodel import Session
connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
connection_string = (
"postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
)
db_string = connection_string.format(
drivername="psycopg2", # You'll need to use psycopg2 driver for PostgreSQL
username=get_settings().DB_USERNAME,
@ -28,7 +30,9 @@ db_engine = create_engine(
def get_db_session():
if db_engine is None:
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
raise RuntimeError(
"Database is not configured. Set DATABASE_URL in environment variables."
)
return Session(db_engine)

View file

@ -0,0 +1,12 @@
from typing import List
from sqlalchemy import insert, delete
from sqlalchemy.orm import Session
from backend.app.db.connection import db_session, db_read_session
from backend.app.db.models.condition import PropertyConditionSurveyModel
def bulk_insert_property_surveys(
session: Session, surveys: List[PropertyConditionSurveyModel]
) -> None:
raise NotImplementedError

View file

@ -0,0 +1,97 @@
from sqlalchemy import (
BigInteger,
Column,
Date,
ForeignKey,
Integer,
String,
Enum as SqlEnum,
)
from sqlalchemy.orm import declarative_base, relationship
from backend.condition.domain.aspect_type import AspectType
from backend.condition.domain.element_type import ElementType
Base = declarative_base()
ElementTypeDb = SqlEnum(
ElementType,
name="element_type",
native_enum=True,
values_callable=lambda enum: [e.value for e in enum],
)
AspectTypeDb = SqlEnum(
AspectType,
name="aspect_type",
native_enum=True,
values_callable=lambda enum: [a.value for a in enum],
)
class PropertyConditionSurveyModel(Base):
__tablename__ = "property_condition_survey"
id = Column(BigInteger, primary_key=True, autoincrement=True)
uprn = Column(BigInteger, nullable=False)
date = Column(Date, nullable=False)
source = Column(String, nullable=False)
elements = relationship(
"ElementModel",
back_populates="survey",
cascade="all, delete-orphan",
)
class ElementModel(Base):
__tablename__ = "element" # TODO: rename to survey_element?
id = Column(BigInteger, primary_key=True, autoincrement=True)
survey_id = Column(
BigInteger,
ForeignKey("property_condition_survey.id"),
nullable=False,
)
element_type = Column(ElementTypeDb, nullable=False)
element_instance = Column(BigInteger, nullable=False)
survey = relationship(
"PropertyConditionSurveyModel",
back_populates="elements",
)
aspect_conditions = relationship(
"AspectConditionModel",
back_populates="element",
cascade="all, delete-orphan",
)
class AspectConditionModel(Base):
__tablename__ = "aspect_condition" # TODO: rename to survey_aspect?
id = Column(BigInteger, primary_key=True, autoincrement=True)
element_id = Column(
BigInteger,
ForeignKey("element.id"),
nullable=False,
)
aspect_type = Column(AspectTypeDb, nullable=False)
aspect_instance = Column(BigInteger, nullable=False)
value = Column(String)
quantity = Column(Integer)
install_date = Column(Date)
renewal_year = Column(Integer)
comments = Column(String)
element = relationship(
"ElementModel",
back_populates="aspect_conditions",
)

View file

@ -20,7 +20,7 @@ The processor currently supports file formats provided by **Peabody** and **LBWF
The `local_runner` script allows the processor to be executed in a local environment.
1. Copy a sample input file into the `sample_data/` directory.
1. Copy sample input file(s) into the `sample_data/` directory. If working with Peabody data, you'll need the Landlord Reference / UPRN lookup file as well.
2. Update `local_runner.py` as required, specifically the definitions of:
- `lbwf_path`
- `peabody_path`

View file

@ -21,6 +21,8 @@ def main() -> None:
/ "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx"
)
filepaths = [lbwf_path, peabody_path]
# filepaths = [lbwf_path]
# filepaths = [peabody_path]
for fp in filepaths:
with fp.open("rb") as f:

View file

@ -1,4 +1,4 @@
from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
from typing import BinaryIO, Any, Dict, Iterator, List, Optional, Tuple
from openpyxl import Workbook, load_workbook
from collections import defaultdict
@ -15,7 +15,11 @@ logger = setup_logger()
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(
wb

View file

@ -1,8 +1,13 @@
from abc import ABC, abstractmethod
from typing import BinaryIO, Any
from typing import BinaryIO, Any, Dict, Optional
class Parser(ABC):
@abstractmethod
def parse(self, file_stream: BinaryIO) -> Any:
pass
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
pass

View file

@ -1,26 +1,55 @@
from typing import Any, BinaryIO, Dict, Iterator, List, Tuple, DefaultDict
import csv
from pathlib import Path
from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
PeabodyAssetCondition,
)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
from utils.logger import setup_logger
logger = setup_logger()
class PeabodyParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = PeabodyParser._generate_address_to_uprn_dict(wb)
assets = self._parse_assets(wb)
return self._group_assets_into_properties(
class PeabodyParser(Parser):
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
wb: Workbook = load_workbook(file_stream)
if location_ref_to_uprn_map is None:
location_ref_to_uprn_map: Dict[str, int] = (
PeabodyParser._build_location_ref_to_uprn_map()
)
assets = PeabodyParser._parse_assets(wb)
return PeabodyParser._group_assets_into_properties(
assets=assets,
address_to_uprn_map=address_to_uprn_map,
location_ref_to_uprn_map=location_ref_to_uprn_map,
)
@staticmethod
def _build_location_ref_to_uprn_map() -> Dict[str, int]:
location_ref_to_uprn_filepath: Path = (
Path(__file__).resolve().parents[1]
/ "sample_data"
/ "peabody"
/ "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
)
location_ref_to_uprn_map: Dict[str, int] = {}
with location_ref_to_uprn_filepath.open(newline="") as f:
reader: Any = csv.DictReader(f)
for row in reader:
location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
return location_ref_to_uprn_map
@staticmethod
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
@ -33,39 +62,43 @@ class PeabodyParser(Parser):
assets: List[PeabodyAssetCondition] = []
for row in asset_rows:
try:
asset = PeabodyParser._map_row_to_asset_record(row, asset_header_indexes)
asset = PeabodyParser._map_row_to_asset_record(
row, asset_header_indexes
)
if not asset.is_block_level:
# Block-level condition surveys are out of scope for now
# until we have a wider think on how to handle block
assets.append(asset) # TODO: handle block-level assets
# until we have a wider think on how to handle block
assets.append(asset) # TODO: handle block-level assets
except Exception as e:
logger.error(f"Error mapping Peabody row to asset record: {e}")
continue
return assets
@staticmethod
def _group_assets_into_properties(
assets: List[PeabodyAssetCondition],
address_to_uprn_map: Dict[str, int],
location_ref_to_uprn_map: Dict[str, int],
) -> List[PeabodyProperty]:
assets_by_address: DefaultDict[str, List[PeabodyAssetCondition]] = defaultdict(list)
assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = (
defaultdict(list)
)
for asset in assets:
if asset.full_address is None:
if asset.lo_reference is None:
continue
address = asset.full_address.strip()
assets_by_address[address].append(asset)
assets_by_location_reference[asset.lo_reference].append(asset)
properties: List[PeabodyProperty] = []
for address, grouped_assets in assets_by_address.items():
uprn = address_to_uprn_map.get(address)
for location_ref, grouped_assets in assets_by_location_reference.items():
uprn = location_ref_to_uprn_map.get(location_ref)
if uprn is None:
logger.warning(f"No UPRN found for address: {address}")
logger.warning(f"No UPRN found for Location Reference: {location_ref}")
continue
properties.append(
@ -77,7 +110,6 @@ class PeabodyParser(Parser):
return properties
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
@ -102,39 +134,9 @@ class PeabodyParser(Parser):
condition_survey_date=row[header_indexes["condition_survey_date"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet = wb["Survey Records - D & Lower"]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = PeabodyParser._get_column_indexes_by_name(headers)
address_idx = header_indexes["full_address"]
address_to_uprn: Dict[str, int] = {}
# Generate random UPRNs for now
next_uprn = 1 # TODO: get real UPRNs
for row in rows:
address = row[address_idx]
if address is None:
continue
address = address.strip()
if address not in address_to_uprn:
address_to_uprn[address] = next_uprn
next_uprn += 1
return address_to_uprn
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
headers: Tuple[object | None, ...],
) -> Dict[str, int]:
index: Dict[str, int] = {}
@ -142,4 +144,4 @@ class PeabodyParser(Parser):
if isinstance(header, str):
index[header] = i
return index
return index

View file

@ -0,0 +1,86 @@
import time
from typing import List, Optional
from sqlmodel import Session
from utils.logger import setup_logger
from backend.app.db.models.condition import (
AspectConditionModel,
ElementModel,
PropertyConditionSurveyModel,
)
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.app.db.connection import db_session
logger = setup_logger()
class ConditionPostgres:
def bulk_insert_surveys(
self, surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100
) -> None:
logger.info(
f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
)
survey_models: List[PropertyConditionSurveyModel] = [
ConditionPostgres.map_survey_to_model(s) for s in surveys
]
total: int = len(survey_models)
logger.info(
f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
)
with db_session() as session:
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = survey_models[start:end]
t0: float = time.perf_counter()
ConditionPostgres._insert_surveys_batch(batch, session)
elapsed: float = time.perf_counter() - t0
logger.info(
f"Inserted batch {start} - {end} ({len(batch)} surveys) in {elapsed} seconds",
)
@staticmethod
def map_survey_to_model(
survey: PropertyConditionSurvey,
) -> PropertyConditionSurveyModel:
survey_model = PropertyConditionSurveyModel(
uprn=survey.uprn,
date=survey.date,
source=survey.source,
elements=[],
)
for element in survey.elements:
element_model = ElementModel(
element_type=element.element_type,
element_instance=element.element_instance,
aspect_conditions=[],
)
for aspect in element.aspect_conditions:
aspect_model = AspectConditionModel(
aspect_type=aspect.aspect_type,
aspect_instance=aspect.aspect_instance,
value=aspect.value,
quantity=aspect.quantity,
install_date=aspect.install_date,
renewal_year=aspect.renewal_year,
comments=aspect.comments,
)
element_model.aspect_conditions.append(aspect_model)
survey_model.elements.append(element_model)
return survey_model
@staticmethod
def _insert_surveys_batch(
surveys: List[PropertyConditionSurveyModel], session: Session
) -> None:
session.add_all(surveys)
session.commit()

View file

@ -1,25 +1,33 @@
from typing import Any, BinaryIO, List
from datetime import datetime
from utils.logger import setup_logger
from backend.condition.domain.mapping.mapper import Mapper
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.parsing.parser import Parser
from utils.logger import setup_logger
from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.file_type import FileType, detect_file_type
from backend.condition.parsing.factory import select_parser, select_mapper
logger = setup_logger()
def process_file(file_stream: BinaryIO, source_key: str) -> None:
print(f"[processor] Received file: {source_key}")
logger.info(f"[processor] Received file: {source_key}")
# Instantiation
file_type: FileType = detect_file_type(source_key)
parser: Parser = select_parser(file_type)
mapper: Mapper = select_mapper(file_type)
persistence = ConditionPostgres()
# Orchestration
raw_properties: List[Any] = parser.parse(file_stream)
logger.info(
f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..."
)
survey_year = datetime.now().year # TODO: get this from filepath or elsewhere
property_condition_surveys: List[PropertyConditionSurvey] = []
@ -29,4 +37,10 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
mapper.map_asset_conditions_for_property(p, survey_year)
)
print("done") # temp
logger.info(
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
)
persistence.bulk_insert_surveys(property_condition_surveys)
logger.info(f"[processor] Finished loading surveys to database")

View file

@ -1,3 +1,4 @@
from backend.app.db.models.condition import PropertyConditionSurveyModel
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
@ -72,3 +73,41 @@ class CustomAsserts:
f"{actual_aspect.comments} != {expected_aspect.comments}"
)
return True
def assert_property_condition_survey_model_matches_expected(
actual_model: PropertyConditionSurveyModel,
expected: dict,
) -> None:
assert actual_model.uprn == expected["uprn"], "UPRN differs"
assert actual_model.date == expected["date"], "Date differs"
assert actual_model.source == expected["source"], "Source differs"
assert len(actual_model.elements) == len(expected["elements"]), (
f"Expected {len(expected['elements'])} elements, "
f"got {len(actual_model.elements)}"
)
for i, (actual_element, expected_element) in enumerate(
zip(actual_model.elements, expected["elements"])
):
assert (
actual_element.element_type == expected_element["element_type"]
), f"Element[{i}].element_type differs"
assert (
actual_element.element_instance == expected_element["element_instance"]
), f"Element[{i}].element_instance differs"
assert len(actual_element.aspect_conditions) == len(
expected_element["aspects"]
), f"Element[{i}] aspect count differs"
for j, (actual_aspect, expected_aspect) in enumerate(
zip(actual_element.aspect_conditions, expected_element["aspects"])
):
prefix = f"Element[{i}].Aspect[{j}]"
for key, value in expected_aspect.items():
assert getattr(actual_aspect, key) == value, (
f"{prefix}.{key} differs: "
f"{getattr(actual_aspect, key)} != {value}"
)

View file

@ -1,127 +1,141 @@
import pytest
from typing import Any
from typing import Any, Dict
from io import BytesIO
from openpyxl import Workbook
from datetime import datetime
from backend.condition.parsing.peabody_parser import PeabodyParser
from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
PeabodyAssetCondition,
)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
@pytest.fixture
def peabody_assets_xlsx_bytes() -> BytesIO:
wb = Workbook()
survey_records_d_and_lower = wb.active
survey_records_d_and_lower.title = "Survey Records - D & Lower"
survey_records_d_and_lower.append([
"Lo_Reference",
"full_address",
"location_type_code",
"Parent_Lo_Reference",
"Element_Code",
"Element",
"Sub_Element_Code",
"Sub_Element",
"Material_Code",
"material_or_answer",
"Renewal_Quantity",
"Renewal_Year",
"Renewal_Cost",
"cloned",
"lo_type_code",
"condition_survey_date",
])
survey_records_d_and_lower.append([
"B000RAND",
"1 RANDOM HOUSE LONDON",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025,12,4,9,17,0)
])
survey_records_d_and_lower.append([
"B000BLOCK",
"1100 BLOCK",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025,12,4,9,17,0)
])
survey_records_d_and_lower.append([
"B000FAKE",
"3 FAKE CLOSE LONDON",
3,
"FAKEEST",
100,
"GENERAL",
15,
"External Decoration",
2,
"Normal",
1,
2035,
1500.7,
"N",
3,
datetime(2025,7,5,0,0,0)
])
survey_records_d_and_lower.append([
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
54,
"HHSRS",
29,
"HHSRS Structural Collapse & Falling Elements",
4,
"HHSRS Moderate",
2,
2027,
None,
"N",
3,
None
])
survey_records_d_and_lower.append([
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
53,
"External",
2,
"Chimney",
2,
"Present",
33,
2053,
3531,
"N",
3,
None
])
survey_records_d_and_lower.append(
[
"Lo_Reference",
"full_address",
"location_type_code",
"Parent_Lo_Reference",
"Element_Code",
"Element",
"Sub_Element_Code",
"Sub_Element",
"Material_Code",
"material_or_answer",
"Renewal_Quantity",
"Renewal_Year",
"Renewal_Cost",
"cloned",
"lo_type_code",
"condition_survey_date",
]
)
survey_records_d_and_lower.append(
[
"B000RAND",
"1 RANDOM HOUSE LONDON",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025, 12, 4, 9, 17, 0),
]
)
survey_records_d_and_lower.append(
[
"B000BLOCK",
"1100 BLOCK",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025, 12, 4, 9, 17, 0),
]
)
survey_records_d_and_lower.append(
[
"B000FAKE",
"3 FAKE CLOSE LONDON",
3,
"FAKEEST",
100,
"GENERAL",
15,
"External Decoration",
2,
"Normal",
1,
2035,
1500.7,
"N",
3,
datetime(2025, 7, 5, 0, 0, 0),
]
)
survey_records_d_and_lower.append(
[
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
54,
"HHSRS",
29,
"HHSRS Structural Collapse & Falling Elements",
4,
"HHSRS Moderate",
2,
2027,
None,
"N",
3,
None,
]
)
survey_records_d_and_lower.append(
[
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
53,
"External",
2,
"Chimney",
2,
"Present",
33,
2053,
3531,
"N",
3,
None,
]
)
stream = BytesIO()
wb.save(stream)
@ -129,18 +143,32 @@ def peabody_assets_xlsx_bytes() -> BytesIO:
return stream
def test_peabody_parser_parses_conditions(peabody_assets_xlsx_bytes):
@pytest.fixture
def location_ref_to_uprn_map() -> Dict[str, int]:
return {
"B000RAND": 1,
"B000BLOCK": 2,
"B000FAKE": 3,
"B000MIS": 4,
}
def test_peabody_parser_parses_conditions(
peabody_assets_xlsx_bytes, location_ref_to_uprn_map
):
# arrange
parser = PeabodyParser()
# act
result: Any = parser.parse(peabody_assets_xlsx_bytes)
result: Any = parser.parse(peabody_assets_xlsx_bytes, location_ref_to_uprn_map)
# assert
assert len(result) == 3
assert all(isinstance(item, PeabodyProperty) for item in result)
@pytest.fixture
def asset_condition_factory():
def _factory(full_address: str) -> PeabodyAssetCondition:
@ -165,6 +193,7 @@ def asset_condition_factory():
return _factory
@pytest.mark.parametrize(
"full_address, expected_block_level",
[
@ -175,7 +204,7 @@ def asset_condition_factory():
("81A-B GORE ROAD LONDON", True),
("73 & 74 HARVEST COURT ST. ALBANS", True),
("25 HAVERSHAM COURT GREENFORD", False),
("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False)
("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False),
],
)
def test_peabody_asset_is_block_level(
@ -187,4 +216,4 @@ def test_peabody_asset_is_block_level(
asset_condition = asset_condition_factory(full_address)
# act + assert
assert asset_condition.is_block_level == expected_block_level
assert asset_condition.is_block_level == expected_block_level

View file

@ -0,0 +1,164 @@
import pytest
from datetime import date
from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.domain.element import Element
from backend.condition.domain.element_type import ElementType
from backend.condition.domain.aspect_condition import AspectCondition
from backend.condition.domain.aspect_type import AspectType
from backend.app.db.models.condition import PropertyConditionSurveyModel
from backend.condition.tests.custom_asserts import CustomAsserts
def test_map_survey_to_model() -> None:
# arrange
survey = PropertyConditionSurvey(
uprn=1,
elements=[
Element(
element_type=ElementType.EXTERNAL_WINDOWS,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.MATERIAL,
aspect_instance=1,
value="UPVC Double Glazed",
quantity=8,
install_date=None,
renewal_year=2036,
comments=None,
),
],
),
Element(
element_type=ElementType.EXTERNAL_DECORATION,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.CONDITION,
aspect_instance=1,
value="Normal",
quantity=1,
install_date=None,
renewal_year=2029,
comments=None,
)
],
),
Element(
element_type=ElementType.EXTERNAL_WALL,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=1,
value="Pointed",
quantity=65,
install_date=None,
renewal_year=2045,
comments=None,
),
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=1,
value="Pointing",
quantity=1,
install_date=None,
renewal_year=2069,
comments=None,
),
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=2,
value="Tile Hung",
quantity=8,
install_date=None,
renewal_year=2049,
comments=None,
),
],
),
],
date=date(2000, 1, 1),
source="Peabody",
)
expected = {
"uprn": 1,
"date": date(2000, 1, 1),
"source": "Peabody",
"elements": [
{
"element_type": ElementType.EXTERNAL_WINDOWS,
"element_instance": 1,
"aspects": [
{
"aspect_type": AspectType.MATERIAL,
"aspect_instance": 1,
"value": "UPVC Double Glazed",
"quantity": 8,
"install_date": None,
"renewal_year": 2036,
"comments": None,
}
],
},
{
"element_type": ElementType.EXTERNAL_DECORATION,
"element_instance": 1,
"aspects": [
{
"aspect_type": AspectType.CONDITION,
"aspect_instance": 1,
"value": "Normal",
"quantity": 1,
"install_date": None,
"renewal_year": 2029,
"comments": None,
}
],
},
{
"element_type": ElementType.EXTERNAL_WALL,
"element_instance": 1,
"aspects": [
{
"aspect_instance": 1,
"value": "Pointed",
"quantity": 65,
"install_date": None,
"renewal_year": 2045,
"comments": None,
},
{
"aspect_type": AspectType.FINISH,
"aspect_instance": 1,
"value": "Pointing",
"quantity": 1,
"install_date": None,
"renewal_year": 2069,
"comments": None,
},
{
"aspect_type": AspectType.FINISH,
"aspect_instance": 2,
"value": "Tile Hung",
"quantity": 8,
"install_date": None,
"renewal_year": 2049,
"comments": None,
},
],
},
],
}
# act
model: PropertyConditionSurveyModel = ConditionPostgres.map_survey_to_model(survey)
# assert (survey level)
CustomAsserts.assert_property_condition_survey_model_matches_expected(
model,
expected,
)