data pull pipeline ready

This commit is contained in:
Khalim Conn-Kowlessar 2024-03-15 17:53:18 +00:00
parent 12f780a089
commit 6423ab2fac
2 changed files with 61 additions and 50 deletions

View file

@ -147,6 +147,7 @@ class SearchEpc:
uprn: [int, None] = None,
size=None,
property_type=None,
fast=False
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -187,6 +188,7 @@ class SearchEpc:
self.size = size if size is not None else 25
self.property_type = property_type
self.fast = fast
@classmethod
def get_house_number(cls, address: str) -> str | None:
@ -365,9 +367,6 @@ class SearchEpc:
# Finally, we identify the newest epc and the rest, and then return
newest_epc, older_epcs = self.filter_newest_epc(list_of_epcs=rows)
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
# We can sometimes have no uprn for a property
@ -384,6 +383,12 @@ class SearchEpc:
uprn = uprns.pop() if uprns else None
if self.fast:
return newest_epc, [], {}, "", "", None
# Retrieve postcode and address
address_epc, postcode_epc = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
@staticmethod

View file

@ -5461,9 +5461,9 @@ def forecast_remaining_sales(loader):
def fml_data_pull(loader):
has_bruh = [
# "HA7", "HA14", "HA25", "HA39", "HA16", "HA28",
# "HA7", "HA14", "HA25", "HA39", "HA16", "HA28", "HA13",
# Updated get_property_type_and_built_form, still needs running
"HA13", "HA50", "HA24", "HA15", "HA32", "HA28", "HA6", "HA1", "HA107", "HA41", "HA48", "HA2", "HA63", "HA12",
"HA50", "HA24", "HA15", "HA32", "HA28", "HA6", "HA1", "HA107", "HA41", "HA48", "HA2", "HA63", "HA12",
"HA117", "HA35", "HA34", "HA56", "HA19", "HA18", "HA9", "HA27", "HA30", "HA31", "HA54", "HA49",
# todo
]
@ -5474,57 +5474,63 @@ def fml_data_pull(loader):
from backend.SearchEpc import SearchEpc
epc_api_key = "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA="
failed_has = []
for ha in has_bruh:
asset_list = loader.data[ha]["asset_list"].copy()
# properties found as eligibile
fml = asset_list[asset_list["ECO Eligibility"] != "not eligible"]
print(f"Pulling data for {ha}")
try:
asset_list = loader.data[ha]["asset_list"].copy()
# properties found as eligibile
fml = asset_list[asset_list["ECO Eligibility"] != "not eligible"]
# For each property, search for the latest EPC
epc_data = []
for _, row in tqdm(fml.iterrows(), total=fml.shape[0]):
# For each property, search for the latest EPC
epc_data = []
for _, row in tqdm(fml.iterrows(), total=fml.shape[0]):
property_type, _ = get_property_type_and_built_form(property_meta=row, ha_name=ha)
property_type, _ = get_property_type_and_built_form(property_meta=row, ha_name=ha)
if ha == "HAXXX":
to_join = [str(x) for x in
[row["Door Number"], row["Address Line 1"], row["Address Line 2"], row["Address Line 3"],
row["Postcode"]] if x is not None]
full_address = ", ".join(to_join)
else:
full_address = row["matching_address"]
if ha == "HAXXX":
to_join = [str(x) for x in
[row["Door Number"], row["Address Line 1"], row["Address Line 2"], row["Address Line 3"],
row["Postcode"]] if x is not None]
full_address = ", ".join(to_join)
else:
full_address = row["matching_address"]
searcher = SearchEpc(
address1=str(row["HouseNo"]),
postcode=row["matching_postcode"],
auth_token=epc_api_key,
os_api_key="",
property_type=property_type,
full_address=full_address,
searcher = SearchEpc(
address1=str(row["HouseNo"]),
postcode=row["matching_postcode"],
auth_token=epc_api_key,
os_api_key="",
property_type=property_type,
full_address=full_address,
fast=True
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_row_id": row["asset_list_row_id"],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
# Remove None entries
epc_data = [x for x in epc_data if x is not None]
# Save the data in S3 as a parquet
epc_data_df = pd.DataFrame(epc_data)
save_pickle_to_s3(
data=epc_data_df,
bucket_name="retrofit-datalake-dev",
s3_file_name=f"ha-analysis/revised/{ha}/epc_data.pickle"
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_row_id": row["asset_list_row_id"],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
# Remove None entries
epc_data = [x for x in epc_data if x is not None]
# Save the data in S3 as a parquet
epc_data_df = pd.DataFrame(epc_data)
save_pickle_to_s3(
data=epc_data_df,
bucket_name="retrofit-datalake-dev",
s3_file_name=f"ha-analysis/revised/{ha}/epc_data.pickle"
)
except Exception as e:
failed_has.append(ha)
def extract_lower_bound(age_band):