set up cottons asset list

This commit is contained in:
Khalim Conn-Kowlessar 2024-12-02 17:50:08 +00:00
parent c806ef7151
commit 3e8a1bc4fd
5 changed files with 540 additions and 1 deletions

View file

@ -0,0 +1,394 @@
import os
from msal import ConfidentialClientApplication
from datetime import datetime, timedelta
import requests
from functools import wraps
import time
import logging
from io import BytesIO
import pandas as pd
# Configure logging
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def handle_error(response):
"""
Handle errors based on HTTP status codes and log detailed information.
"""
try:
error_json = response.json().get('error', {})
except ValueError:
error_json = {}
error_code = error_json.get('code', 'unknownError')
error_message = error_json.get('message', 'No detailed error message provided.')
inner_error = error_json.get('innererror', {})
details = error_json.get('details', [])
logger.error(f"Error Code: {error_code}")
logger.error(f"Error Message: {error_message}")
if inner_error:
logger.error(f"Inner Error: {inner_error}")
if details:
logger.error(f"Error Details: {details}")
if response.status_code == 401:
logger.error("Unauthorized. Token might be invalid.")
elif response.status_code == 403:
logger.error("Forbidden. Access denied to the requested resource.")
elif response.status_code == 404:
logger.error("Not Found. The requested resource doesnt exist.")
elif response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
elif response.status_code in (500, 503):
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.error(f"Server error. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
else:
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
def api_call_decorator(func):
"""
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
:param func: The function to be decorated.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
# Check and refresh the access token if needed
if self.is_access_token_expired():
self.retrieve_access_token()
logger.info("Access token refreshed.")
# Get the HTTP method, URL, and optionally data from the function
http_method, url, data = func(self, *args, **kwargs)
# Initialize the results list and handle pagination if page_size is provided
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
while url:
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
if response.status_code == 200:
response_json = response.json() # Store the response JSON
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
else:
response_data = response_json # Capture the full response for consistency
break
else:
retry = handle_error(response)
if retry == 'retry':
continue
if page_size:
response_data = {'value': results}
return response_data
except Exception as e:
logger.exception("An error occurred during the API call.")
raise e
return wrapper
class SharePointClient:
access_token = None
access_token_request_timestamp = None
access_token_expiry = None
headers = None
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None,
access_token_expiration_details=None):
"""
Initializes the SharePointClient with necessary credentials and site information.
:param tenant_id: The tenant ID.
:param client_id: The client ID.
:param client_secret: The client secret.
:param site_id: The site ID.
:param access_token: The access token (optional)
:param access_token_expiration_details: The access token expiration details (optional)
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
if access_token:
if not access_token_expiration_details:
raise ValueError("Access token expiration details must be provided.")
self.access_token = access_token
self.set_access_token_expiration_details(access_token_expiration_details)
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
else:
self.retrieve_access_token()
# Retrieve static identifiers
self.site_id = site_id
self.document_drive = self.get_documents_drive()
def get_token_expiration_details(self):
"""
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
:return:
"""
return {
'access_token_request_timestamp': datetime.strftime(
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
),
'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT)
}
def set_access_token_expiration_details(self, access_token_expiration_details):
"""
Sets the access token expiration details from a serialized dictionary.
:param access_token_expiration_details: The serialized access token expiration details.
:return:
"""
self.access_token_request_timestamp = datetime.strptime(
access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT
)
self.access_token_expiry = datetime.strptime(
access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT
)
def is_access_token_expired(self):
"""
Checks if the access token has expired. If it has, a new access token is retrieved.
:return: True if expired, False otherwise.
"""
return datetime.now() >= self.access_token_expiry
def retrieve_access_token(self, refresh=False):
"""
Implements authentication using MSAL.
:param refresh: If True, force a refresh of the access token.
:return: None
"""
app = ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret
)
scope = ["https://graph.microsoft.com/.default"]
access_token_request_timestamp = datetime.now()
if refresh:
logger.info("Forcing refresh of access token.")
token = app.acquire_token_for_client(scopes=scope)
else:
# Check if a token is already cached
token = app.acquire_token_silent(scope, account=None)
if not token:
token = app.acquire_token_for_client(scopes=scope)
if "access_token" not in token:
logger.error("Authentication failed.")
raise ValueError("Authentication failed")
access_token_expiry = access_token_request_timestamp + timedelta(
seconds=token['expires_in'] - 20
)
self.access_token = token
self.access_token_request_timestamp = access_token_request_timestamp
self.access_token_expiry = access_token_expiry
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
logger.info("Access token retrieved successfully.")
@api_call_decorator
def get_documents_drive(self):
"""
Get the document drive of the SharePoint site.
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
logger.info(f"Getting document drive from URL: {url}")
return 'GET', url, None
@api_call_decorator
def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100):
"""
This function will list the contents of a folder in SharePoint.
:param drive_id: The ID of the drive.
:param folder_path: The path of the folder.
:param page_size: The number of items per page (default is 100).
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}"
logger.info(f"Listing folder contents from URL: {url}")
return 'GET', url, None
@staticmethod
def download_sharepoint_file(download_url):
"""
Downloads a file from the given URL and returns its content.
:param download_url: The URL to download the file from.
:return: The content of the downloaded file.
"""
response = requests.get(download_url, stream=True)
response.raise_for_status() # Check if the request was successful
file_content = BytesIO()
# Read the file content into memory
for chunk in response.iter_content(chunk_size=8192):
file_content.write(chunk)
file_content.seek(0) # Reset the file pointer to the beginning
return file_content
def app():
# Customers for WC 18/11/2024
#
# ----- Eastlight location -----
# No data this week, low on data
# Housing Associations/Eastlight/Survey Outcomes/
#
# ----- Settle location -----
# No data this week, in separate files
# Housing Associations/Settle/Survey Outcomes/
#
# ----- Community Housing -----
# In separate files - will we get to a singular form?
# Housing Associations/Community Housing/Survey Outcomes/
#
# ----- ACIS location -----
# Doesn't have this week's data
# Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx
#
# ----- Southern location -----
#
#
# ------ Unitas location ------
# Does have this week's data
# Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx
locations = {
"Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx",
"Eastlight": "Housing Associations/Eastlight/Survey Outcomes/",
"Settle": "Housing Associations/Settle/Survey Outcomes/",
"Community Housing": "Housing Associations/Community Housing/Survey Outcomes/",
"ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx",
"Southern": None,
}
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None)
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None)
WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None)
sharepoint_client = SharePointClient(
tenant_id=SHAREPOINT_TENANT_ID,
client_id=SHAREPOINT_CLIENT_ID,
client_secret=SHAREPOINT_CLIENT_SECRET,
site_id=WARMFRONT_SHAREPOINT_SITE_ID
)
results = []
for customer, location in locations.items():
if location is None:
continue
if location.endswith(".xlsx"):
# Read in the file
# List the contents of the folder
location_folder = os.path.dirname(location)
contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path=location_folder
)
filepaths = contents["value"]
download_url = next(
(file['@microsoft.graph.downloadUrl'] for file in filepaths
if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)),
None
)
if download_url is None:
raise ValueError("File not found in the SharePoint folder.")
file_content = sharepoint_client.download_sharepoint_file(download_url)
# Convert to pandas dataframe since file is an excel file
df = pd.read_excel(file_content)
df["Outcome"] = df["Outcome"].str.strip().str.lower()
# We cannot group by funding type accurately because any job that is not funded will have a NaN value
# and therefore we have a 100% acces rate for funded jobs and 0% otherwise
surveyor_outcomes = []
for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]):
funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)]
if funding_type:
funding_type = " + ".join(funding_type)
else:
funding_type = "No Funding"
surveyed = group[group["Outcome"] == "surveyed"]
no_answer = group[
group["Outcome"] == "no answer"
]
other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])]
surveyor_outcomes.append(
{
"Surveyor": surveyor,
"Week": week,
"Funding": funding_type,
"Surveyed": surveyed.shape[0],
"No Answer": no_answer.shape[0],
"Other Issue": other_issue.shape[0],
}
)
surveyor_outcomes = pd.DataFrame(surveyor_outcomes)
surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"])
weekly_access = (
surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index()
)
# Sort by week and surveyor ascending
surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True])
surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / (
surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"]
)
weekly_access["Total"] = (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)
weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)

View file

@ -0,0 +1,11 @@
python-docx==0.8.11
PyPDF2==3.0.1
boto3
requests
pandas
pyarrow==12.0.1
openpyxl==3.1.2
usaddress==0.5.10
pdfplumber==0.10.3
msgpack==1.0.5
msal

View file

@ -0,0 +1,102 @@
import os
import time
from tqdm import tqdm
import pandas as pd
from dotenv import load_dotenv
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.s3 import save_csv_to_s3
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
PORTFOLIO_ID = 121
USER_ID = 8
def app():
"""
Prepares the inputs to produce the remote assessments for Cottons
:return:
"""
# Read in the asset list
cottons_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List EPC Data Pull.xlsx"
)
# A number are missing EPCs due to the space in the postcode
# Breakdowns:
# C 119
# D 106
# E 26
# B 5
#
# Take the EPC D/E properties
asset_list = cottons_asset_list[
cottons_asset_list["EPC rating on register"].isin(["D", "E"])
]
asset_list = asset_list.reset_index(drop=True)
asset_list["row_id"] = asset_list.index
asset_list["uprn"] = asset_list["uprn"].astype(int)
extracted_data = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
add1 = home["address1"]
pc = home["postcode"]
# Retrieve the EPC data
epc_searcher = SearchEpc(
address1=add1,
postcode=pc, uprn=home["uprn"], auth_token=EPC_AUTH_TOKEN, os_api_key=""
)
epc_searcher.find_property(skip_os=True)
find_epc_searcher = RetrieveFindMyEpc(address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"])
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn
extracted_data.append(
{
"uprn": home["uprn"],
**find_epc_data,
}
)
non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in extracted_data
]
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(asset_list),
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
}
print(body)

View file

@ -295,6 +295,7 @@ class RetrieveFindMyEpc:
"Change room heaters to condensing boiler": ["boiler_upgrade"],
"Cylinder thermostat": ["cylinder_thermostat"],
"Heat recovery system for mixer showers": ["heat_recovery_shower"],
"Room-in-roof insulation": ["room_in_roof_insulation"],
}
survey = True

View file

@ -30,9 +30,12 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column):
postcode = home[postcode_column]
house_number = home[address1_column]
full_address = home[fulladdress_column]
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
searcher = SearchEpc(
address1=str(house_number),
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
@ -46,6 +49,34 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column):
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")[1].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home["row_id"])
continue