data cleaning wip

This commit is contained in:
Khalim Conn-Kowlessar 2023-06-07 23:00:59 +01:00
parent d48a40c64b
commit ec95fcf99c
7 changed files with 195 additions and 9 deletions

View file

@ -1,15 +1,31 @@
from epc_api.client import EpcClient
from epc_data.config import EPC_AUTH_TOKEN
class Property:
def __init__(self, postcode, address1, data=None):
def __init__(self, postcode, address1, epc_client=None, data=None):
self.postcode = postcode
self.address1 = address1
self.data = data
if epc_client:
self.epc_client = epc_client
else:
self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
def search_address_epc(self):
"""
This method searches for an address in the EPC database and returns the first result
:return: property data
"""
if self.data:
return
# This will fail if a property does not have an EPC - this has been documented as a case to handle
response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
if len(response["rows"]) > 1:
raise Exception("More than one result found for this address - investigate me")
self.data = response["rows"][0]

View file

@ -3,10 +3,10 @@
We're using conda to manage environments to circumvent the
issues with Mac M1. This documentation will also cover Pycharm setup.
We're working in python 3.11 so
We're working in python 3.10 so
```commandline
conda create -n hestia-data python=3.11
conda create -n hestia-data python=3.10
```
Then activate the environment
@ -28,3 +28,8 @@ and click OK, or select the conda environment from the dropdown.
You may need to restart Pycharm for the new interpreter to be recognised.
To install project dependencies navigate to /epc_data and run
```commandline
pip install -r requirements.txt
```

6
epc_data/config.py Normal file
View file

@ -0,0 +1,6 @@
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path='epc_data/.env')
EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN')

26
epc_data/downloader.py Normal file
View file

@ -0,0 +1,26 @@
import time
def pagenated_epc_download(client, params, page_size, n_pages, verbose=0, slowdown=0.1):
offset_from = 0
n_completed = 0
results = []
complete = False
while not complete:
if verbose:
print("Pulling for page %s" % str(int(offset_from / page_size) + 1))
time.sleep(slowdown)
search_resp = client.domestic.search(params=params, offset_from=offset_from, size=page_size)
# Note: We can only make 10k queries for a single set of search queries.
# It might make sense to download data via zip for machine learning since we don't need this
# data to be perfectly up to date
if search_resp is None:
break
results.extend(search_resp["rows"])
if n_completed == n_pages:
complete = True
else:
offset_from += page_size
return results

View file

@ -1 +1,4 @@
epc-api-python
epc-api-python
python-dotenv
tqdm
pandas

View file

@ -4,10 +4,6 @@ input_data = [
"address1": "28 Distillery Wharf",
"postcode": "w6 9bf"
},
{
"address1": "23 Bulter House",
"postcode": "e2 0pn"
},
{
"address1": "Flat 14 Godley V C House",
"postcode": "E2 0LP"

View file

@ -1,8 +1,142 @@
from tqdm import tqdm
from epc_data.temp_inputs import input_data
from epc_data.Property import Property
from epc_data.config import EPC_AUTH_TOKEN
from epc_api.client import EpcClient
from epc_data.downloader import pagenated_epc_download
def handler():
# To begin with, the input data is a list of dictionaries, however we would read this file in
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
input_properties = [
Property(postcode=config['postcode'], address1=config['address1'], epc_client=epc_client)
for config in input_data
]
for p in input_properties:
p.search_address_epc()
local_authorities = {p.data['local-authority'] for p in input_properties}
data = []
for la in tqdm(local_authorities):
data.extend(
pagenated_epc_download(
client=epc_client,
params={"local-authority": la},
page_size=5000,
n_pages=10,
)["rows"]
)
# TODO: Temp - pull in sample
from collections import Counter
import pickle
from pprint import pprint
with open("./epc_data/test_epc_data.obj", "rb") as f:
data = pickle.load(f)
# TODO: Fill this
ClEANING_FIELDS = [
"roof-description",
"floor-description",
"walls-description",
"mainheat-description"
]
field = "roof-description"
unique_vals = Counter([v[field] for v in data])
pprint(unique_vals)
def search_description_options(desc):
if desc == "insulated":
return "average"
raise Exception("Handle me")
def find_insulation_thickness(description_lower, is_pitched, is_roof_room, is_flat):
if "no insulation" in description_lower:
return 0
if is_pitched:
try:
return int(description_lower.split("pitched,")[-1].split("mm")[0].lstrip().rstrip())
except ValueError as _:
desc = description_lower.split("pitched,")[-1].lstrip().split(" ")[0]
return search_description_options(desc)
if is_roof_room:
# Just search for specific phrases
desc = description_lower.split("roof room(s),")[-1].lstrip().split(" ")[0]
return search_description_options(desc)
if is_flat:
# Just search for specific phrases
desc = description_lower.split("flat,")[-1].lstrip().split(" ")[0]
return search_description_options(desc)
raise Exception("Unhandled")
def clean_roof(description):
"""
We aim to extract features about the roof, so we can characterise it. We will check:
- If the roof is pitched
- If there is a room roof
- if there is a loft
- If it has insulation
- if so, what degree of insulation
-
:param x:
:return:
"""
description_lower = description.lower().lstrip().rstrip()
if "another dwelling above" in description_lower:
return {
"is_pitched": False,
"is_roof_room": False,
"has_loft": False,
"insulation_thickness": 0,
"has_dwelling_above": True,
"assumed": "assumed" in description_lower,
"is_flat": "flat" in description_lower
}
is_pitched = "pitched" in description_lower
is_roof_room = "roof room" in description_lower
has_loft = "loft" in description_lower
is_flat = "flat" in description_lower
if "insulation" in description_lower or "insulated" in description_lower:
# if has_loft and is_pitched:
# insulation_thickness = find_insulation_thickness(description_lower)
# elif not has_loft and is_pitched:
# insulation_thickness = find_insulation_thickness(description_lower)
# else:
# raise Exception("Implement me")
insulation_thickness = find_insulation_thickness(description_lower, is_pitched, is_roof_room, is_flat)
else:
raise Exception("Implment me 2")
attributes = {
"is_pitched": is_pitched,
"is_roof_room": is_roof_room,
"has_loft": has_loft,
"insulation_thickness": insulation_thickness,
"has_dwelling_above": False,
"assumed": "assumed" in description_lower,
"is_flat": is_flat
}
return attributes
cleaned_roof = []
for description in unique_vals.keys():
cleaned_roof.append(
{"original": description, "cleaned": clean_roof(description)}
)