Model/etl/costs/app.py
2025-08-17 17:45:25 +01:00

143 lines
5.1 KiB
Python

import os
import dotenv
import pandas as pd
import numpy as np
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from backend.app.db.models.materials import Material
from recommendations.recommendation_utils import calculate_r_value_per_mm
import inspect
src_file_path = inspect.getfile(lambda: None)
DATA_DIRECTORY = Path(src_file_path).parent / "local_data" / "20250815 Domna Materials.xlsx"
# Environment file is at the same level as this file
ENV_FILE = Path(src_file_path).parent / "etl" / "costs" / ".env"
dotenv.load_dotenv(ENV_FILE)
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_NAME = os.getenv('DB_NAME')
def push_costs_to_db(engine, costs_df):
"""
Push costs DataFrame to the database.
:param engine: The SQLAlchemy engine connected to your database.
:param costs_df: The DataFrame containing cost data.
"""
materials = []
for _, row in costs_df.iterrows():
row_dict = row.to_dict()
# Add other necessary transformations here
# Create Material object and add it to the list
materials.append(Material(**row_dict))
# Use SQLAlchemy session for bulk insert
with Session(engine) as session:
session.bulk_save_objects(materials)
session.commit()
def set_current_costs_inactive(engine):
"""
Set all current costs to inactive in the database.
:param engine: The SQLAlchemy engine connected to your database.
"""
with Session(engine) as session:
session.query(Material).update({Material.is_active: False})
session.commit()
def app():
"""
This application uploads the cost data to our database
The most recent cost data can be found in OneDrive, in the
shared folder > 04. Product Development > Cost data > Hestia Materials.xlsx
For the moment, the data is uploaded manually. In the future, we will automate this so the data can be
stored locally and then is uploaded from the local_data folder
:return:
"""
connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
db_string = connection_string.format(
drivername="psycopg2", # You'll need to use psycopg2 driver for PostgreSQL
username=DB_USERNAME,
password=DB_PASSWORD,
server=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
)
db_engine = create_engine(db_string, pool_size=5, max_overflow=5)
cwi_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="cavity_wall_insulation", header=0)
ventilation_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="Ventilation", header=0)
loft_insulation_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="loft_insulation", header=0)
iwi_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="internal_wall_insulation", header=0)
suspended_floor_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="suspended_floor_insulation", header=0)
solid_floor_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="solid_floor_insulation", header=0)
ewi_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="external_wall_insulation", header=0)
lel_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="low_energy_lighting", header=0)
flat_roof_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="flat_roof_insulation", header=0)
window_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="window_glazing", header=0)
rir_insulation_costs = pd.read_excel(DATA_DIRECTORY, sheet_name="room_roof_insulation", header=0)
solar_pv = pd.read_excel(DATA_DIRECTORY, sheet_name="solar_pv", header=0)
hhrsh = pd.read_excel(DATA_DIRECTORY, sheet_name="hhrsh", header=0)
scaffolding = pd.read_excel(DATA_DIRECTORY, sheet_name="scaffolding", header=0)
fireplaces = pd.read_excel(DATA_DIRECTORY, sheet_name="fireplaces", header=0)
# Form a single table to be uploaded
costs = pd.concat(
[
cwi_costs,
ventilation_costs,
loft_insulation_costs,
iwi_costs,
suspended_floor_costs,
solid_floor_costs,
ewi_costs,
lel_costs,
flat_roof_costs,
window_costs,
rir_insulation_costs,
solar_pv,
hhrsh,
scaffolding,
fireplaces
]
)
costs = costs.replace({np.nan: None})
costs["depth"] = costs["depth"].fillna(0)
costs["depth"] = costs["depth"].astype(str)
costs["r_value_per_mm"] = costs.apply(
lambda row: calculate_r_value_per_mm(float(row["depth"]), row["thermal_conductivity"]), axis=1
)
costs["r_value_unit"] = "square_meter_kelvin_per_watt"
for col in ["material_cost", "labour_cost", "labour_hours_per_unit", "plant_cost"]:
costs[col] = costs[col].fillna(0)
# Push the costs to the database
# Since this is just uploading all of the new costs to the database, we make all of the current costs inactive
print("Setting all current costs to inactive")
set_current_costs_inactive(db_engine)
print("Pushing costs to db")
push_costs_to_db(db_engine, costs)
if __name__ == "__main__":
app()