Model/etl/customers/slide_utils.py

370 lines
13 KiB
Python

from pptx.enum.text import PP_ALIGN # NOQA
from pptx import Presentation
from pptx.util import Inches, Pt
import matplotlib.pyplot as plt
from sqlalchemy.orm import Session
from sqlalchemy.sql import true
from backend.app.db.utils import row2dict
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from backend.app.db.models.recommendations import Recommendation
from backend.app.db.models.recommendations import PlanModel
from backend.app.utils import sap_to_epc
EPC_COLOURS = {
"A": "#028051",
"B": "#14b759",
"C": "#8ecd46",
"D": "#fdd401",
"E": "#fdab67",
"F": "#ee8023",
"G": "#e71437",
}
def get_properties_with_default_recommendations(session: Session, portfolio_id: int):
"""
Fetch properties for a given portfolio_id along with their default recommendations,
ensuring that all properties are retrieved even if they don't have recommendations
where default is True.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve properties and recommendations.
:return: A list of dictionaries, where each dictionary represents a property including
its associated default recommendations if any.
"""
# Adjust the join to correctly filter recommendations while including all properties
query = (
session.query(PropertyModel, Recommendation)
.outerjoin(
Recommendation,
(Recommendation.property_id == PropertyModel.id)
& (Recommendation.default == true()),
)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
)
properties = {}
for property, recommendation in query:
# Ensure the property is added once with an empty list of recommendations initially
if property.id not in properties:
properties[property.id] = row2dict(property)
properties[property.id]["recommendations"] = []
# Append recommendations if they exist and meet the criteria (already filtered by the query)
if recommendation and recommendation.default:
properties[property.id]["recommendations"].append(row2dict(recommendation))
return list(properties.values())
def get_property_details_by_portfolio_id(session: Session, portfolio_id: int):
"""
This function retrieves all property details associated with a given portfolio_id.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve property details.
:return: A list of dictionaries, where each dictionary represents a property's details.
Returns an empty list if no property details are found.
"""
property_details = (
session.query(PropertyDetailsEpcModel)
.filter(PropertyDetailsEpcModel.portfolio_id == portfolio_id)
.all()
)
# Convert the SQLAlchemy objects to dictionaries
property_details_dict = (
[row2dict(pd) for pd in property_details] if property_details else []
)
return property_details_dict
def get_plan_by_portfolio_id(session: Session, portfolio_id: int):
"""
This function retrieves all plans associated with a given portfolio_id.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve plans.
:return: A list of dictionaries, where each dictionary represents a plan.
Returns an empty list if no plans are found.
"""
plans = (
session.query(PlanModel).filter(PlanModel.portfolio_id == portfolio_id).all()
)
# Convert the SQLAlchemy objects to dictionaries
plans_dict = [row2dict(plan) for plan in plans] if plans else []
return plans_dict
def plot_epc_distribution(
df,
customer_key,
title="Your Units",
background_color="white",
bar_height=0.4,
font_size=15,
):
"""
Plots a horizontal bar chart of EPC rating distribution with adjustable bar thickness and text sizes.
Allows setting the plot background color and dynamically adjusts text size and bar spacing.
:param df: DataFrame with columns ['current_epc_rating', 'count', 'percentage']
:param title: Title of the plot
:param background_color: Background color of the plot
:param bar_height: Thickness of the bars (default 0.4)
:param font_size: Base font size for text annotations (default 15)
"""
# Calculate dynamic figure size or adjust based on preferences
square_size = max(
6, len(df) * 0.6
) # Ensure minimum size and adjust based on number of entries
fig, ax = plt.subplots(figsize=(square_size, square_size))
fig.patch.set_facecolor(background_color) # Set figure background color
ax.set_facecolor(background_color) # Set axes background color
df["percentage"] = df["percentage"].round(
1
) # Round the percentage values to 1 decimal place
df_sorted = df.sort_values("percentage", ascending=True)
# Plot bars with specified height for adjustable thickness
bars = ax.barh(
df_sorted["current_epc_rating"],
df_sorted["percentage"],
color=df_sorted["current_epc_rating"].map(EPC_COLOURS),
edgecolor="none",
height=bar_height,
)
epc_rating_font_size = (
font_size * 2
) # EPC rating font size larger than base font size
count_percentage_font_size = (
font_size # Count (percentage) font size as base font size
)
# Annotate bars with EPC ratings inside and count with percentage values outside
for index, bar in enumerate(bars):
width = bar.get_width()
epc_rating = df_sorted.iloc[index]["current_epc_rating"]
count = df_sorted.iloc[index]["count"]
percentage = df_sorted.iloc[index]["percentage"]
# EPC rating inside the bar with increased font size
ax.text(
width - (width * 0.05),
bar.get_y() + bar.get_height() / 2,
f"{epc_rating}",
va="center",
ha="right",
color="white",
fontsize=epc_rating_font_size,
)
# Count and percentage outside the bar, original font size
ax.text(
width + 1,
bar.get_y() + bar.get_height() / 2,
f"{count} ({percentage}%)",
va="center",
color="black",
fontsize=count_percentage_font_size,
)
ax.set_title(
title, fontsize=font_size * 1.2
) # Adjust title font size proportionally
ax.tick_params(
axis="x", which="both", bottom=False, top=False, labelbottom=False
) # Remove x-axis tick marks and values
ax.tick_params(
axis="y", which="both", left=False, right=False, labelleft=False
) # Remove y-axis tick marks and labels
ax.spines["top"].set_visible(False) # Remove top spine
ax.spines["right"].set_visible(False) # Remove right spine
ax.spines["left"].set_visible(False) # Remove left spine
ax.spines["bottom"].set_visible(False) # Remove bottom spine
plt.tight_layout() # Adjust layout
plt.show()
# Save the figure as an image
figure_path = f"etl/customers/{customer_key}/epc_distribution_plot.png"
fig.savefig(figure_path, bbox_inches="tight")
plt.close(fig) # Close the figure to free memory
return fig, figure_path
def save_plot_to_image(figure, path="plot.png"):
"""
Saves a matplotlib figure to an image file for insertion into PowerPoint.
"""
figure.savefig(path, bbox_inches="tight")
plt.close(figure)
def save_figure_as_image(figure, filename="temp_plot.png"):
"""
Saves a matplotlib figure to an image file.
"""
figure.savefig(filename, dpi=300)
plt.close(
figure
) # Close the figure to prevent it from displaying in notebooks or Python environments
def add_commentary_with_bullets(
slide,
commentary,
top_inches,
left_inches=Inches(1),
width_inches=Inches(8),
height_inches=Inches(2),
):
"""
Adds commentary with bullet points to a slide.
:param slide: The slide object to add the commentary to.
:param commentary: The commentary text, with sections separated by newlines for bullet points.
:param top_inches: The top position of the commentary text box.
:param left_inches: The left position of the commentary text box.
:param width_inches: The width of the commentary text box.
:param height_inches: The height of the commentary text box.
"""
txBox = slide.shapes.add_textbox(
left_inches, top_inches, width_inches, height_inches
)
tf = txBox.text_frame
# Configure text frame
tf.word_wrap = True
tf.auto_size = True
tf.paragraphs[0].alignment = PP_ALIGN.LEFT
# Split the commentary into sections for bullet points
sections = commentary.split("\n")
for i, section in enumerate(sections):
if i > 0:
p = (
tf.add_paragraph()
) # Add a new paragraph for each section after the first
else:
p = tf.paragraphs[0] # Use the first paragraph for the first section
p.text = section
p.space_after = Pt(14) # Adjust space after each bullet point as needed
p.font.size = Pt(14) # Adjust font size as needed
p.level = 0 # Bullet level, can be adjusted for nested bullets
p.space_before = Pt(0)
def add_slide_with_image(prs, title, img_path=None, commentary=None):
"""
Adds a slide with an image (if provided) and optional commentary. If no image is provided,
places the commentary text in the middle of the slide.
"""
slide_layout = prs.slide_layouts[5] # Title and Content layout
slide = prs.slides.add_slide(slide_layout)
title_placeholder = slide.shapes.title
title_placeholder.text = title
# Determine the position of the commentary text box based on whether an image is included
if img_path:
# Add the image
slide.shapes.add_picture(
img_path, Inches(1), Inches(1.5), Inches(8), Inches(4.5)
)
# Position for commentary when image is present
commentary_top = Inches(6)
else:
# Position for commentary when image is not present (centered vertically)
commentary_top = Inches(3)
# Add commentary if provided
if commentary:
add_commentary_with_bullets(slide, commentary, commentary_top)
def create_powerpoint(data, save_location):
"""
Creates a PowerPoint presentation based on provided data and optional commentaries.
:param data: A dictionary containing the data needed for each slide.
:param save_location: The file path where the PowerPoint presentation will be saved.
"""
prs = Presentation()
for slide, slide_data in data.items():
slide_figure_path = data[slide].get("image_path")
text = data[slide].get("text")
title = data[slide].get("title", "")
add_slide_with_image(prs, title, slide_figure_path, text)
# Save the presentation
prs.save(save_location)
def create_recommendations_summary(
recommendations_df, properties_df, property_details_df, sap_target
):
# Aggregate the impact of the recommendations
# We want:
# Total number of sap points
# total valuation impact
# total bill savings
# total cost
# Total Co2 impact
recommendations_summary = (
recommendations_df.groupby(["property_id"])
.agg(
total_sap_points=("sap_points", "sum"),
total_valuation_impact=("property_valuation_increase", "sum"),
total_bill_savings=("energy_cost_savings", "sum"),
total_cost=("estimated_cost", "sum"),
total_carbon=("co2_equivalent_savings", "sum"),
adjusted_heat_demand=("adjusted_heat_demand", "sum"),
)
.reset_index()
)
# Merge on current sap points, current CO2, current adjusted_heat_demand, current annual bill
recommendations_summary = recommendations_summary.merge(
properties_df[["id", "uprn", "current_sap_points"]].rename(
columns={"id": "property_id"}
),
on="property_id",
how="left",
)
recommendations_summary["expected_sap_points"] = (
recommendations_summary["current_sap_points"]
+ recommendations_summary["total_sap_points"]
)
recommendations_summary["expected_epc_rating"] = recommendations_summary[
"expected_sap_points"
].apply(lambda x: sap_to_epc(x))
recommendations_summary["sap_difference"] = (
sap_target - recommendations_summary["expected_sap_points"]
)
if property_details_df is not None:
recommendations_summary = recommendations_summary.merge(
property_details_df[
["uprn", "co2_emissions", "adjusted_energy_consumption", "energy_bill"]
].rename(
columns={
"id": "property_id",
"co2_emissions": "current_co2",
"adjusted_energy_consumption": "current_energy",
"energy_bill": "current_energy_bill",
}
),
on="uprn",
how="left",
)
return recommendations_summary