mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
import pytest
|
|
|
|
from domain.addresses.postcode_batching import iter_postcode_grouped_batches
|
|
from domain.addresses.unstandardised_address import AddressList, UnstandardisedAddress
|
|
from domain.postcode import Postcode
|
|
|
|
|
|
def _addrs(postcode: str, n: int) -> AddressList:
|
|
return AddressList(
|
|
[
|
|
UnstandardisedAddress(address=f"{i} {postcode} Street", postcode=Postcode(postcode))
|
|
for i in range(n)
|
|
]
|
|
)
|
|
|
|
|
|
def test_empty_input_yields_no_batches() -> None:
|
|
# act / assert
|
|
assert list(iter_postcode_grouped_batches([])) == []
|
|
|
|
|
|
def test_single_batch_under_cap() -> None:
|
|
# arrange
|
|
addrs = _addrs("AA1 1AA", 3) + _addrs("BB2 2BB", 2)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(addrs, max_batch_size=500))
|
|
# assert
|
|
assert len(batches) == 1
|
|
assert batches[0] == addrs
|
|
|
|
|
|
def test_multiple_postcodes_packed_into_one_batch_up_to_cap() -> None:
|
|
# Two groups whose total exactly equals the cap pack into a single
|
|
# batch -- no premature flush.
|
|
# arrange
|
|
addrs = _addrs("AA1 1AA", 3) + _addrs("BB2 2BB", 2)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(addrs, max_batch_size=5))
|
|
# assert
|
|
assert len(batches) == 1
|
|
assert len(batches[0]) == 5
|
|
|
|
|
|
def test_flush_on_overflow_before_adding_next_postcode() -> None:
|
|
# Cap is 5. First group fills 3 slots; second group of 3 would overflow,
|
|
# so the buffer is flushed first and the next group starts a fresh batch.
|
|
# arrange
|
|
addrs = _addrs("AA1 1AA", 3) + _addrs("BB2 2BB", 3)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(addrs, max_batch_size=5))
|
|
# assert
|
|
assert len(batches) == 2
|
|
assert [str(a.postcode) for a in batches[0]] == ["AA11AA"] * 3
|
|
assert [str(a.postcode) for a in batches[1]] == ["BB22BB"] * 3
|
|
|
|
|
|
def test_single_postcode_group_exceeding_cap_is_dispatched_whole() -> None:
|
|
# An oversize single-postcode group goes out as one batch larger than
|
|
# the cap -- the cap never splits a postcode.
|
|
# arrange
|
|
addrs = _addrs("AA1 1AA", 7)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(addrs, max_batch_size=5))
|
|
# assert
|
|
assert len(batches) == 1
|
|
assert len(batches[0]) == 7
|
|
|
|
|
|
def test_oversize_group_flushes_existing_buffer_first() -> None:
|
|
# Mirrors the legacy ``if buffer: flush`` branch when an oversize group
|
|
# is encountered: buffered work must not be lost or interleaved.
|
|
# arrange
|
|
small = _addrs("AA1 1AA", 2)
|
|
big = _addrs("BB2 2BB", 7)
|
|
tail = _addrs("CC3 3CC", 1)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(small + big + tail, max_batch_size=5))
|
|
# assert
|
|
assert len(batches) == 3
|
|
assert [str(a.postcode) for a in batches[0]] == ["AA11AA", "AA11AA"]
|
|
assert [str(a.postcode) for a in batches[1]] == ["BB22BB"] * 7
|
|
assert [str(a.postcode) for a in batches[2]] == ["CC33CC"]
|
|
|
|
|
|
def test_final_flush_yields_remaining_buffer() -> None:
|
|
# No overflow ever happens, but the trailing buffer must still come out.
|
|
# arrange
|
|
addrs = _addrs("AA1 1AA", 2) + _addrs("BB2 2BB", 2)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches(addrs, max_batch_size=500))
|
|
# assert
|
|
assert batches == [addrs]
|
|
|
|
|
|
def test_postcode_grouping_preserves_first_seen_order() -> None:
|
|
# Interleaved input must still group by postcode and emit in first-seen
|
|
# order -- never alphabetical.
|
|
# arrange
|
|
a1, a2 = _addrs("ZZ9 9ZZ", 2)
|
|
b1, b2 = _addrs("AA1 1AA", 2)
|
|
# act
|
|
batches = list(iter_postcode_grouped_batches([a1, b1, a2, b2]))
|
|
# assert
|
|
assert len(batches) == 1
|
|
assert [str(a.postcode) for a in batches[0]] == [
|
|
"ZZ99ZZ",
|
|
"ZZ99ZZ",
|
|
"AA11AA",
|
|
"AA11AA",
|
|
]
|
|
|
|
|
|
def test_invalid_max_batch_size_raises() -> None:
|
|
# act / assert
|
|
with pytest.raises(ValueError, match="max_batch_size"):
|
|
list(iter_postcode_grouped_batches([], max_batch_size=0))
|