diff --git a/etl/access_reporting/app.py b/etl/access_reporting/app.py index e69de29b..830f4370 100644 --- a/etl/access_reporting/app.py +++ b/etl/access_reporting/app.py @@ -0,0 +1,394 @@ +import os +from msal import ConfidentialClientApplication +from datetime import datetime, timedelta +import requests +from functools import wraps +import time +import logging +from io import BytesIO +import pandas as pd + +# Configure logging +logger = logging.getLogger(__name__) +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +def handle_error(response): + """ + Handle errors based on HTTP status codes and log detailed information. + """ + try: + error_json = response.json().get('error', {}) + except ValueError: + error_json = {} + + error_code = error_json.get('code', 'unknownError') + error_message = error_json.get('message', 'No detailed error message provided.') + inner_error = error_json.get('innererror', {}) + details = error_json.get('details', []) + + logger.error(f"Error Code: {error_code}") + logger.error(f"Error Message: {error_message}") + if inner_error: + logger.error(f"Inner Error: {inner_error}") + if details: + logger.error(f"Error Details: {details}") + + if response.status_code == 401: + logger.error("Unauthorized. Token might be invalid.") + elif response.status_code == 403: + logger.error("Forbidden. Access denied to the requested resource.") + elif response.status_code == 404: + logger.error("Not Found. The requested resource doesn’t exist.") + elif response.status_code == 429: + retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided + logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...") + time.sleep(retry_after) + return 'retry' + elif response.status_code in (500, 503): + retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided + logger.error(f"Server error. Retrying after {retry_after} seconds...") + time.sleep(retry_after) + return 'retry' + else: + raise ValueError(f"API request failed with status code {response.status_code} - {error_message}") + + raise ValueError(f"API request failed with status code {response.status_code} - {error_message}") + + +def api_call_decorator(func): + """ + Handles various aspects of the API call, including refreshing the access token if needed and handling pagination. + :param func: The function to be decorated. + :return: The wrapped function. + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + # Check and refresh the access token if needed + if self.is_access_token_expired(): + self.retrieve_access_token() + logger.info("Access token refreshed.") + + # Get the HTTP method, URL, and optionally data from the function + http_method, url, data = func(self, *args, **kwargs) + + # Initialize the results list and handle pagination if page_size is provided + results = [] + page_size = kwargs.get('page_size', None) + response_data = {} + + while url: + response = requests.request(http_method, url, headers=self.headers, json=data) + + # Handle the response + if response.status_code == 200: + response_json = response.json() # Store the response JSON + if page_size: + results.extend(response_json.get('value', [])) + url = response_json.get('@odata.nextLink', None) + else: + response_data = response_json # Capture the full response for consistency + break + else: + retry = handle_error(response) + if retry == 'retry': + continue + + if page_size: + response_data = {'value': results} + + return response_data + + except Exception as e: + logger.exception("An error occurred during the API call.") + raise e + + return wrapper + + +class SharePointClient: + access_token = None + access_token_request_timestamp = None + access_token_expiry = None + headers = None + + TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + + def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None, + access_token_expiration_details=None): + """ + Initializes the SharePointClient with necessary credentials and site information. + :param tenant_id: The tenant ID. + :param client_id: The client ID. + :param client_secret: The client secret. + :param site_id: The site ID. + :param access_token: The access token (optional) + :param access_token_expiration_details: The access token expiration details (optional) + """ + self.tenant_id = tenant_id + self.client_id = client_id + self.client_secret = client_secret + + if access_token: + if not access_token_expiration_details: + raise ValueError("Access token expiration details must be provided.") + self.access_token = access_token + self.set_access_token_expiration_details(access_token_expiration_details) + self.headers = { + 'Authorization': f"Bearer {self.access_token['access_token']}" + } + else: + self.retrieve_access_token() + + # Retrieve static identifiers + self.site_id = site_id + self.document_drive = self.get_documents_drive() + + def get_token_expiration_details(self): + """ + Returns the access token expiration details. Converts the datetime objects to strings for serialization. + :return: + """ + return { + 'access_token_request_timestamp': datetime.strftime( + self.access_token_request_timestamp, self.TIMESTAMP_FORMAT + ), + 'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT) + } + + def set_access_token_expiration_details(self, access_token_expiration_details): + """ + Sets the access token expiration details from a serialized dictionary. + :param access_token_expiration_details: The serialized access token expiration details. + :return: + """ + self.access_token_request_timestamp = datetime.strptime( + access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT + ) + self.access_token_expiry = datetime.strptime( + access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT + ) + + def is_access_token_expired(self): + """ + Checks if the access token has expired. If it has, a new access token is retrieved. + :return: True if expired, False otherwise. + """ + return datetime.now() >= self.access_token_expiry + + def retrieve_access_token(self, refresh=False): + """ + Implements authentication using MSAL. + :param refresh: If True, force a refresh of the access token. + :return: None + """ + app = ConfidentialClientApplication( + self.client_id, + authority=f"https://login.microsoftonline.com/{self.tenant_id}", + client_credential=self.client_secret + ) + + scope = ["https://graph.microsoft.com/.default"] + + access_token_request_timestamp = datetime.now() + + if refresh: + logger.info("Forcing refresh of access token.") + token = app.acquire_token_for_client(scopes=scope) + else: + # Check if a token is already cached + token = app.acquire_token_silent(scope, account=None) + + if not token: + token = app.acquire_token_for_client(scopes=scope) + + if "access_token" not in token: + logger.error("Authentication failed.") + raise ValueError("Authentication failed") + + access_token_expiry = access_token_request_timestamp + timedelta( + seconds=token['expires_in'] - 20 + ) + + self.access_token = token + self.access_token_request_timestamp = access_token_request_timestamp + self.access_token_expiry = access_token_expiry + self.headers = { + 'Authorization': f"Bearer {self.access_token['access_token']}" + } + + logger.info("Access token retrieved successfully.") + + @api_call_decorator + def get_documents_drive(self): + """ + Get the document drive of the SharePoint site. + :return: Tuple containing HTTP method, URL, and None for data. + """ + url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive" + logger.info(f"Getting document drive from URL: {url}") + return 'GET', url, None + + @api_call_decorator + def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100): + """ + This function will list the contents of a folder in SharePoint. + :param drive_id: The ID of the drive. + :param folder_path: The path of the folder. + :param page_size: The number of items per page (default is 100). + :return: Tuple containing HTTP method, URL, and None for data. + """ + url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}" + logger.info(f"Listing folder contents from URL: {url}") + return 'GET', url, None + + @staticmethod + def download_sharepoint_file(download_url): + """ + Downloads a file from the given URL and returns its content. + + :param download_url: The URL to download the file from. + :return: The content of the downloaded file. + """ + response = requests.get(download_url, stream=True) + response.raise_for_status() # Check if the request was successful + + file_content = BytesIO() + + # Read the file content into memory + for chunk in response.iter_content(chunk_size=8192): + file_content.write(chunk) + + file_content.seek(0) # Reset the file pointer to the beginning + + return file_content + + +def app(): + # Customers for WC 18/11/2024 + # + # ----- Eastlight location ----- + # No data this week, low on data + # Housing Associations/Eastlight/Survey Outcomes/ + # + # ----- Settle location ----- + # No data this week, in separate files + # Housing Associations/Settle/Survey Outcomes/ + # + # ----- Community Housing ----- + # In separate files - will we get to a singular form? + # Housing Associations/Community Housing/Survey Outcomes/ + # + # ----- ACIS location ----- + # Doesn't have this week's data + # Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx + # + # ----- Southern location ----- + # + # + # ------ Unitas location ------ + # Does have this week's data + # Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx + + locations = { + "Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx", + "Eastlight": "Housing Associations/Eastlight/Survey Outcomes/", + "Settle": "Housing Associations/Settle/Survey Outcomes/", + "Community Housing": "Housing Associations/Community Housing/Survey Outcomes/", + "ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx", + "Southern": None, + } + + SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None) + SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None) + SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None) + WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None) + + sharepoint_client = SharePointClient( + tenant_id=SHAREPOINT_TENANT_ID, + client_id=SHAREPOINT_CLIENT_ID, + client_secret=SHAREPOINT_CLIENT_SECRET, + site_id=WARMFRONT_SHAREPOINT_SITE_ID + ) + + results = [] + for customer, location in locations.items(): + if location is None: + continue + + if location.endswith(".xlsx"): + # Read in the file + # List the contents of the folder + location_folder = os.path.dirname(location) + contents = sharepoint_client.list_folder_contents( + drive_id=sharepoint_client.document_drive["id"], + folder_path=location_folder + ) + filepaths = contents["value"] + + download_url = next( + (file['@microsoft.graph.downloadUrl'] for file in filepaths + if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)), + None + ) + + if download_url is None: + raise ValueError("File not found in the SharePoint folder.") + + file_content = sharepoint_client.download_sharepoint_file(download_url) + + # Convert to pandas dataframe since file is an excel file + df = pd.read_excel(file_content) + df["Outcome"] = df["Outcome"].str.strip().str.lower() + + # We cannot group by funding type accurately because any job that is not funded will have a NaN value + # and therefore we have a 100% acces rate for funded jobs and 0% otherwise + surveyor_outcomes = [] + for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]): + funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)] + if funding_type: + funding_type = " + ".join(funding_type) + else: + funding_type = "No Funding" + surveyed = group[group["Outcome"] == "surveyed"] + no_answer = group[ + group["Outcome"] == "no answer" + ] + other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])] + + surveyor_outcomes.append( + { + "Surveyor": surveyor, + "Week": week, + "Funding": funding_type, + "Surveyed": surveyed.shape[0], + "No Answer": no_answer.shape[0], + "Other Issue": other_issue.shape[0], + } + ) + + surveyor_outcomes = pd.DataFrame(surveyor_outcomes) + surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"]) + + weekly_access = ( + surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index() + ) + # Sort by week and surveyor ascending + surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True]) + surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / ( + surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"] + ) + + weekly_access["Total"] = ( + weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"] + ) + weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / ( + weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"] + ) diff --git a/etl/access_reporting/requirements.txt b/etl/access_reporting/requirements.txt new file mode 100644 index 00000000..8e6dbb08 --- /dev/null +++ b/etl/access_reporting/requirements.txt @@ -0,0 +1,11 @@ +python-docx==0.8.11 +PyPDF2==3.0.1 +boto3 +requests +pandas +pyarrow==12.0.1 +openpyxl==3.1.2 +usaddress==0.5.10 +pdfplumber==0.10.3 +msgpack==1.0.5 +msal \ No newline at end of file diff --git a/etl/customers/cottons/remote_assessments.py b/etl/customers/cottons/remote_assessments.py new file mode 100644 index 00000000..fe195f7d --- /dev/null +++ b/etl/customers/cottons/remote_assessments.py @@ -0,0 +1,102 @@ +import os +import time + +from tqdm import tqdm +import pandas as pd +from dotenv import load_dotenv +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc +from backend.SearchEpc import SearchEpc +from utils.s3 import save_csv_to_s3 + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") +PORTFOLIO_ID = 121 +USER_ID = 8 + + +def app(): + """ + Prepares the inputs to produce the remote assessments for Cottons + :return: + """ + + # Read in the asset list + cottons_asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List EPC Data Pull.xlsx" + ) + # A number are missing EPCs due to the space in the postcode + # Breakdowns: + # C 119 + # D 106 + # E 26 + # B 5 + # + # Take the EPC D/E properties + asset_list = cottons_asset_list[ + cottons_asset_list["EPC rating on register"].isin(["D", "E"]) + ] + asset_list = asset_list.reset_index(drop=True) + asset_list["row_id"] = asset_list.index + asset_list["uprn"] = asset_list["uprn"].astype(int) + + extracted_data = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + add1 = home["address1"] + pc = home["postcode"] + # Retrieve the EPC data + epc_searcher = SearchEpc( + address1=add1, + postcode=pc, uprn=home["uprn"], auth_token=EPC_AUTH_TOKEN, os_api_key="" + ) + epc_searcher.find_property(skip_os=True) + + find_epc_searcher = RetrieveFindMyEpc(address=epc_searcher.newest_epc["address1"], + postcode=epc_searcher.newest_epc["postcode"]) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + time.sleep(0.5) + # We need uprn + + extracted_data.append( + { + "uprn": home["uprn"], + **find_epc_data, + } + ) + + non_invasive_recommendations = [ + { + "uprn": r["uprn"], + "recommendations": r["recommendations"] + } for r in extracted_data + ] + + filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(asset_list), + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + # Store the non-invasive recommendations in s3 + non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(non_invasive_recommendations), + bucket_name="retrofit-plan-inputs-dev", + file_name=non_invasive_recommendations_filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Social", + "goal": "Increasing EPC", + "goal_value": "C", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "valuation_file_path": "", + "scenario_name": "Wave 3 Packages", + "multi_plan": True, + "budget": None, + } + print(body) diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index b6394275..4db72b23 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -295,6 +295,7 @@ class RetrieveFindMyEpc: "Change room heaters to condensing boiler": ["boiler_upgrade"], "Cylinder thermostat": ["cylinder_thermostat"], "Heat recovery system for mixer showers": ["heat_recovery_shower"], + "Room-in-roof insulation": ["room_in_roof_insulation"], } survey = True diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py index b53b36c2..0f3e0068 100644 --- a/etl/route_march_data_pull/app.py +++ b/etl/route_march_data_pull/app.py @@ -30,9 +30,12 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column): postcode = home[postcode_column] house_number = home[address1_column] full_address = home[fulladdress_column] + house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode) + if house_no is None: + house_no = house_number searcher = SearchEpc( - address1=str(house_number), + address1=str(house_no), postcode=postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="", @@ -46,6 +49,34 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column): searcher.ordnance_survey_client.built_form = None searcher.find_property(skip_os=True) + + # Check if we have a flat or appartment + if searcher.newest_epc is None: + # Try again: + if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None: + # Backup + add1 = full_address.split(",")[1].strip() + else: + add1 = str(house_number) + searcher = SearchEpc( + address1=add1, + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + + if ( + "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in + house_number.lower() + ): + searcher.ordnance_survey_client.property_type = "Flat" + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: no_epc.append(home["row_id"]) continue