completing source_epc_properties

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-19 11:20:38 +01:00
parent c6ebcedfce
commit 6f053a20d1
2 changed files with 58 additions and 23 deletions

View file

@ -33,7 +33,8 @@ class Ownership:
epc_paths: List[str],
domestic_ownership_path: str,
overseas_ownership_path: str,
land_registry_path: str
land_registry_path: str,
project_name: str
):
"""
@ -45,6 +46,7 @@ class Ownership:
:param overseas_ownership_path: A string which points to the location of the OCOD ownership data, that details
corporate ownership of properties in the UK, where the companies are overseas
:param land_registry_path: A string that points to the location of the land registry data
:param project_name: A string that is used to identify the project
"""
# All epc paths should end with certificates.csv
@ -57,6 +59,9 @@ class Ownership:
self.run_timestamp = str(datetime.now())
# Data storage paths
self.epc_data_filepath = f"ownership/{project_name}/{self.run_timestamp}/epc_data.xlsx"
# Data
self.epc_data = None
self.ownership_data = None
@ -76,47 +81,44 @@ class Ownership:
def source_epc_properties(self, column_filters=None):
"""
This function will filter the epc data as specified by column filers, searching across all of the EPC tables
as defined by
:param column_filters:
:return:
This function will filter the epc data as specified by column filters, searching across all of the EPC tables
:param column_filters: Dictionary with column names as keys and list of acceptable values as values. This
dictionary is is used to filter the EPC data and should look like this:
{"column_name": ["value1", "value2", ...]}, where column_name is the name of the column
in the EPC data and ["value1", "value2", ...] is a list of acceptable values for that
column. If a column is not found in the EPC data, an exception is raised.
"""
column_filters = {} if column_filters is None else column_filters
# TODO: Do the tenure filtering here!
# ["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]
data = []
for path in tqdm(self.epc_paths):
epc_data = pd.read_csv(path, low_memory=False)
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str)
if pd.isnull(pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")).sum():
raise Exception("Lodgement datetime contains ")
raise Exception("Lodgement datetime contains invalid data")
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")
epc_data = epc_data.sort_values(["LODGEMENT_DATETIME"], ascending=False).drop_duplicates("UPRN")
epc_data = epc_data.sort_values(
["LODGEMENT_DATE", "LODGEMENT_DATETIME"], ascending=False
).drop_duplicates("UPRN")
# Apply column filters
for column, values in column_filters.items():
if column in epc_data.columns:
epc_data = epc_data[epc_data[column].isin(values)]
else:
raise Exception(f"Column {column} not found in data. column_filters is malformed")
# Get G & F properties
raise Exception("IMPLEMENT ME")
epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])]
data.append(epc_data)
self.epc_data = pd.concat(data)
# Save as an excel
# TODO: Implement me
self.epc_data = pd.concat(data, ignore_index=True)
# We now store the data in s3
save_excel_to_s3(
df=self.epc_data,
bucket_name="epc_data",
file_key=self.epc_data_filepath
)
# data.to_excel("EPC F & G Properties - V2.xlsx", index=False)
def load_company_ownership(self):
"""

View file

@ -229,6 +229,39 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True):
return df
def save_excel_to_s3(df, bucket_name, file_key):
"""
Save a pandas DataFrame as an Excel file on S3.
:param df: DataFrame to save.
:param bucket_name: S3 bucket name.
:param file_key: S3 file key. This includes the file name and path.
"""
# Ensure the DataFrame is not empty
if df.empty:
raise ValueError("The DataFrame is empty. Nothing to save to Excel.")
# Ensure the file_key ends with an appropriate Excel file extension
if not file_key.endswith((".xls", ".xlsx")):
raise ValueError("The specified file key does not appear to be an Excel file.")
# Create a BytesIO buffer
output = BytesIO()
# Save DataFrame to an Excel file buffer
df.to_excel(output, index=False)
output.seek(0) # Important: move back to the beginning of the buffer
# Initialize a session using boto3
session = boto3.session.Session()
s3 = session.resource('s3')
# Upload the Excel file from the buffer to S3
bucket = s3.Bucket(bucket_name)
bucket.put_object(Body=output, Key=file_key)
logger.info(f"Excel file saved to S3 bucket '{bucket_name}' with key '{file_key}'")
def read_csv_from_s3(bucket_name, filepath):
s3 = boto3.client('s3')