Recolved merge conflicts

This commit is contained in:
Khalim Conn-Kowlessar 2023-08-25 12:15:19 +01:00
commit 93beffe01b
45 changed files with 1073 additions and 688 deletions

View file

@ -1 +1,14 @@
model_data/local_data/*
backend/tests/*
backend/node_modules/*
recommendations/tests/*
model_data/tests/*
infrastructure/*
data_collection/*
node_modules/*
conservation_areas/*
open_uprn/*
land_registry/*
pytest.ini
*/README.md
utils/tests/*

View file

@ -20,7 +20,6 @@ jobs:
- name: Install Serverless and plugins
run: |
npm install -g serverless
npm install -g serverless-python-requirements
npm install -g serverless-domain-manager
- name: Install dependencies
@ -59,16 +58,30 @@ jobs:
echo "::set-output name=db_port::${{ secrets[format('{0}_DB_PORT', github.ref_name)] }}"
echo "::set-output name=db_name::${{ secrets[format('{0}_DB_NAME', github.ref_name)] }}"
# - name: Build Lambda Layer
- name: Set ECR credentials
id: set_ecr_credentials
run: |
echo "::set-output name=ecr_uri::${{ secrets[format('{0}_ECR_URI', github.ref_name)] }}"
- name: Setup Docker
uses: docker/setup-buildx-action@v1
# - name: Setup Docker Buildx
# run: |
# cd backend
# pip install -r requirements/lambda.txt -t python
# zip -r layer.zip python
#
# - name: Publish Lambda Layer
# run: |
# LAYER_ARN=$(aws lambda publish-layer-version --layer-name LambdaDependenciesLayer --zip-file fileb://backend/layer.zip | jq -r '.LayerVersionArn')
# aws ssm put-parameter --name "/${{ github.ref_name }}/LambdaDependenciesLayerArn" --value "$LAYER_ARN" --type String --overwrite
# docker buildx create --use
- name: Build Docker Image
run: |
docker build -t fastapi-lambda-image:${{ github.sha }} -f backend/docker/lambda.Dockerfile . --load
- name: Login to ECR
run: |
aws ecr get-login-password --region eu-west-2 | docker login --username AWS --password-stdin ${{ steps.set_ecr_credentials.outputs.ecr_uri }}
- name: Tag and Push Docker Image to ECR
run: |
docker tag fastapi-lambda-image:${{ github.sha }} ${{ steps.set_ecr_credentials.outputs.ecr_uri }}:${{ github.sha }}
docker push ${{ steps.set_ecr_credentials.outputs.ecr_uri }}:${{ github.sha }}
- name: Deploy to AWS Lambda via Serverless
env:
@ -81,9 +94,11 @@ jobs:
DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }}
DB_PORT: ${{ steps.set_db_credentials.outputs.db_port }}
DB_NAME: ${{ steps.set_db_credentials.outputs.db_name }}
ECR_URI: ${{ steps.set_ecr_credentials.outputs.ecr_uri }}
GITHUB_SHA: ${{ github.sha }}
run: |
# Fetch database credentials from AWS Secrets Manager
SECRET_VALUE=$(aws secretsmanager get-secret-value --secret-id dev/assessment_model/db_credentials --query SecretString)
SECRET_VALUE=$(aws secretsmanager get-secret-value --secret-id ${{ github.ref_name }}/assessment_model/db_credentials --query SecretString)
DB_USERNAME=$(echo "$SECRET_VALUE" | jq -r '. | fromjson | .db_assessment_model_username')
DB_PASSWORD=$(echo "$SECRET_VALUE" | jq -r '. | fromjson | .db_assessment_model_password')

View file

@ -10,6 +10,11 @@ part of a larger application
# Folders
### backend/
This folder contains the code for the fastapi backend service, which provides an interface to
much of the functionality in this repository, for the frontend
### model_data/
This folder contains related to the reading and preparation of

View file

@ -2,10 +2,10 @@ from datetime import datetime
import re
from epc_api.client import EpcClient
from model_data.config import EPC_AUTH_TOKEN
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
class Property(BaseUtility):
class Property(Definitions):
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",

View file

@ -59,6 +59,91 @@ FastAPI automatically generates interactive API documentation for your applicati
server and visit <yourlocalurl>/docs in your browser. Alternatively, you can go to
<yourlocalurl>/redoc to view the documentation in the ReDoc format.
## Building the lambda's backend docker image locally
To build the backend docker image locally, run the following command from the root of the project directory:
```commandline
docker build -t fastapi-lambda-image:latest -f backend/docker/lambda.Dockerfile .
```
To check the size of the resulting image, run the following command:
```bash
docker images | grep fastapi-lambda-image
```
To run a shell inside the Docker container to inspect its contents, run:
```commandline
docker run -it fastapi-lambda-image:latest /bin/bash
```
Running in lambda results in running in a slightly different format compared to running the fastapi
application locally. If you want to run the fastapi application locally, in docker, we have a docker
file which builds the same environment as in lambda but runs the fast api application with uvicorn.
Run
```commandline
docker build -t fastapi-local-image:latest -f backend/docker/Dockerfile .
```
This will be the image. To run it, simply run
```commandline
docker run -p 8000:8000 -v ~/.aws:/root/.aws fastapi-local-image:latest
```
This assumes you have a ~/.aws folder with your aws credentials in it. If you don't have this, you can
run the following command with your aws access token exported into your environment.
```commandline
docker run -p 8000:8000 -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e AWS_DEFAULT_REGION fastapi-local-image:latest
```
## Emulating the lambda locally
I have set up a script called `run_local_lambda.sh` which will allow you to emulate the lambda locally.
You need to have a .env file with the necessary environment variables at backend/env and also
and aws credentials file at ~/.aws/credentials, locally.
To run this, firstly run:
```bash
chmod +x run_local_lambda.sh
```
Now you can run the script with
```
./run_lambda_local.sh
```
In order to make a request to it, there is a specific format the request must be in, to
emuate lambda. If using postman, the url you want is `http://localhost:8000/2015-03-31/functions/function/invocations`
and you need to pass a body like this:
```json
{
"httpMethod": "POST",
"body": "{\"portfolio_id\": 4, \"housing_type\": \"Private\", \"goal\": \"Increase EPC\", \"goal_value\": \"C\", \"trigger_file_path\": \"2/4/portfolio_plan_properties-20230724T093542483Z.csv\"}",
"path": "/v1/plan/trigger",
"resource": "/",
"headers": {
"Accept": "*/*",
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_TOKEN_HERE",
"x-api-key": "YOUR_API_KEY_HERE"
},
"requestContext": {},
"multiValueQueryStringParameters": null
}
```
```commandline
## Testing
To run tests, run the following command from the root of the project directory:
@ -178,60 +263,104 @@ mapped to the appropriate API stage.
Remember to replace api and the CloudFront domain with your actual subdomain
and CloudFront domain.
### Thoughts for authenticating the frontend with the backend
Certainly! Here's a detailed documentation for your README:
To provide secure communication between your frontend Next.js application and your backend FastAPI service, you have
several options. Here are a few popular approaches:
* * *
- JWT (JSON Web Tokens): Since you're already using JWT for authentication in the frontend, you can also use this to
authenticate requests to your FastAPI backend. This involves passing the JWT token in the Authorization header of the
request from your frontend to the backend. Then, you can use a JWT decoder on the backend to validate the token. This
can be done using libraries such as PyJWT in your FastAPI application.
Deployment Troubleshooting for `fastapi-lambda`
-----------------------------------------------
- API Keys: This is another common approach where you issue unique keys for each user/service that needs to access the
backend API. Each API call then includes this key in the request header. FastAPI can easily validate these keys. While
this approach is simpler than JWT, it provides less flexibility and security, as it doesn't allow for claims or
scopes.
### Context:
- OAuth2.0: OAuth2 is a protocol that allows applications to request authorization to access resources on behalf of a
user. FastAPI has direct support for OAuth2 using the OAuth2PasswordBearer class, which can be used for issuing access
tokens to clients. Note that this could be overkill if you're already using JWT and the calls to your backend are not
on behalf of a user.
When deploying the `fastapi-lambda` using Serverless Framework, you may encounter issues related to domain management,
especially if you're using a custom domain for your API. This documentation provides troubleshooting steps and details
on how to resolve potential conflicts.
- Mutual TLS (mTLS): Mutual TLS is a method of two-way communication encryption where both client and server
authenticate each other. This can be more complex to setup but can provide an additional layer of security in some
scenarios.
### Potential Issues & Solutions:
No matter which method we choose, you should always serve your applications over HTTPS to ensure that all data,
including tokens or keys, is encrypted during transmission.
Also, ensure that you handle the JWT tokens carefully, especially if they are stored in the client's browser, as they
could be vulnerable to Cross-Site Scripting (XSS) or Cross-Site Request Forgery (CSRF) attacks. Consider httpOnly
cookies for storing tokens if your use case allows it.
#### 1\. Conflict with Existing CloudFront Distribution:
### I think that we could use both JWT + API Key.
**Error Message**:
# Notes:
csharpCopy code
Using both JWT and API keys can provide an additional layer of security and
could be a good approach for our requirements.
`One or more aliases specified for the distribution includes an incorrectly configured DNS record that points to another CloudFront distribution.`
1. JSON Web Tokens (JWT) are useful for carrying user context between services. With JWT, you can embed user-specific
data (like user ID, roles, permissions, etc.) in a secure, tamper-proof token. This can be validated by your FastAPI
backend to authenticate and authorize the user.
2.
3. API Keys can serve as an identifier for the client application (in this case, your Next.js frontend). It can provide
a straightforward way to track and control how the client application is calling the backend API.
**Cause**: This can occur if there's an existing CNAME record in your DNS provider pointing to a CloudFront
distribution.
Here's a rough workflow of how these can be used together:
**Solution**:
A user logs in to the Next.js frontend using NextAuth and receives a JWT.
This JWT is stored securely in the client's browser.
For each request from the frontend to the backend, the JWT is included in the Authorization header.
In addition to the JWT, an API key unique to the frontend application is included in each request (possibly in a custom
header like X-API-Key).
The backend service validates both the JWT (for user authentication and authorization) and the API key (for client
application validation).
This approach provides a double check for each request:
* Check your DNS provider (e.g., Google Domains) and verify the CNAME record for `api.dev.hestia.homes`.
* Temporarily remove or update the conflicting CNAME record.
* Run the `sls create_domain` command again.
* Update the DNS settings in your DNS provider based on the new configuration provided by
the `serverless-domain-manager` plugin.
#### 2\. Conflict with Route53:
**Error Message**:
csharpCopy code
`Deleting RestApi failed. Please remove all base path mappings related to the RestApi in your domains.`
**Cause**: This can occur if there are residual AWS configurations, especially in Route53, from previous deployments.
**Solution**:
* Navigate to the AWS Route53 Console.
* Identify and delete any residual Hosted Zones or Record Sets related to `api.dev.hestia.homes`.
* Ensure that you have backed up any necessary configurations before deleting.
#### 3\. Other AWS Resources Conflicts:
You might encounter issues where AWS resources, such as S3 buckets or CloudFront distributions, are not properly deleted
or are conflicting with new deployments.
**Solution**:
* Navigate to the respective AWS service dashboard.
* Manually identify and rectify any conflicting resources. This might involve emptying S3 buckets or deleting CloudFront
distributions.
* Ensure backups and proper precautions before deleting any resources.
### Additional Notes:
* **Backup Configurations**: Always backup your configurations before making changes. This ensures that you can revert
to a previous state if needed.
* **DNS Propagation**: Remember that DNS changes can take some time to propagate globally. After making DNS changes, you
might not see immediate effects.
* **CloudFront Distributions**: If you can't find a CloudFront distribution in the AWS CloudFront console, it's possible
that it was automatically created by another AWS service like API Gateway. It might need to be managed or deleted from
that service's dashboard.
### After succesfully running creating the custom domain
After successfully creating the custom domain with the `serverless-domain-manager` plugin, you should add back the CNAME
record into Google Domains (or whatever platform is being used to manage domains now)
to ensure that the custom domain properly points to the CloudFront distribution managed by
AWS.
Here's what you should do:
1. **Log in to Google Domains**:
* Go to [Google Domains](https://domains.google.com/).
* Navigate to the management page for `hestia.homes`.
2. **Add/Update the CNAME Record**:
* Find the section for custom resource records.
* Add (or update if it already exists) a CNAME record for `api.dev`.
* Point it to the CloudFront distribution domain name (e.g., `d2d269kjy1nyhz.cloudfront.net.`). Ensure you include
the trailing dot at the end. This can be found in API gateway
3. **Check DNS Propagation**:
* Keep in mind that DNS changes might take some time to propagate. You can use online tools
like [DNS Checker](https://www.dnschecker.org/) to verify the propagation status worldwide.
* Test your API endpoint `api.dev.hestia.homes` to ensure it's resolving correctly and accessing your Lambda
function.
By following these steps, you should have your custom domain properly configured and pointing to your AWS Lambda
function via the CloudFront distribution.
The JWT verifies that the request comes from a legitimate, authenticated user.
The API key verifies that the request comes from a trusted client application.

View file

@ -1,15 +1,12 @@
from backend.app.db.connection import db_engine
from backend.app.db.models.materials import Material
from sqlalchemy.orm import sessionmaker
def get_materials():
def get_materials(session):
"""
This function will retrieve all materials from the database.
:return: A list of Material objects if successful, an empty list otherwise.
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
materials = session.query(Material).filter(Material.is_active).all()
materials = session.query(Material).filter(Material.is_active).all()
return materials if materials else []

View file

@ -1,39 +1,35 @@
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Plan, PlanRecommendations, Recommendation
from backend.app.db.models.portfolio import Portfolio
def aggregate_portfolio_recommendations(portfolio_id: int):
Session = sessionmaker(bind=db_engine)
with Session() as session:
# Aggregate multiple fields
aggregates = (
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
# For future usage we will aggregate multiple fields in this step
# func.sum(Recommendation.heat_demand).label("total_heat_demand"),
# func.sum(Recommendation.energy_savings).label("total_energy_savings")
)
.join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id)
.join(Plan, Plan.id == PlanRecommendations.plan_id)
.filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True)
.one()
def aggregate_portfolio_recommendations(session, portfolio_id: int):
# Aggregate multiple fields
aggregates = (
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
# For future usage we will aggregate multiple fields in this step
# func.sum(Recommendation.heat_demand).label("total_heat_demand"),
# func.sum(Recommendation.energy_savings).label("total_energy_savings")
)
.join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id)
.join(Plan, Plan.id == PlanRecommendations.plan_id)
.filter(Plan.portfolio_id == portfolio_id, Plan.is_default == True, Recommendation.default == True)
.one()
)
aggregates_dict = {
"cost": aggregates.cost or 0,
# "total_heat_demand": aggregates.total_heat_demand or 0,
# "total_energy_savings": aggregates.total_energy_savings or 0
}
aggregates_dict = {
"cost": aggregates.cost or 0,
# "total_heat_demand": aggregates.total_heat_demand or 0,
# "total_energy_savings": aggregates.total_energy_savings or 0
}
# Get the portfolio and update the fields
portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one()
# Update the data
for key, value in aggregates_dict.items():
setattr(portfolio, key, value)
# Get the portfolio and update the fields
portfolio = session.query(Portfolio).filter_by(id=portfolio_id).one()
# Update the data
for key, value in aggregates_dict.items():
setattr(portfolio, key, value)
# Merge the updated portfolio back into the session
session.merge(portfolio)
session.commit()
# Merge the updated portfolio back into the session
session.merge(portfolio)
session.flush()

View file

@ -3,132 +3,128 @@
###
import datetime
import pytz
from sqlalchemy.orm import sessionmaker
from backend.app.db.models.portfolio import (
PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel
)
from backend.app.db.connection import db_engine
from sqlalchemy.orm.exc import NoResultFound
def create_property(portfolio_id: int, address: str, postcode: str) -> (int, bool):
def create_property(session, portfolio_id: int, address: str, postcode: str) -> (int, bool):
"""
This function will create a record for the property in the database if it does not exist.
If it does exist, it will just update the updated_at field.
:param session: The database session
:param portfolio_id: The ID of the portfolio the property belongs to
:param address: The address of the property
:param postcode: The postcode of the property
:return: The ID of the property and a boolean indicating whether it was created or not
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
address=address, postcode=postcode, portfolio_id=portfolio_id
).one()
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
address=address, postcode=postcode, portfolio_id=portfolio_id
).one()
# Update the 'updated_at' field
existing_property.updated_at = datetime.datetime.now(pytz.utc)
# Update the 'updated_at' field
existing_property.updated_at = datetime.datetime.now(pytz.utc)
# Merge the updated property back into the session
session.merge(existing_property)
session.commit()
# Merge the updated property back into the session
session.merge(existing_property)
session.flush()
return existing_property.id, False
return existing_property.id, False
except NoResultFound:
# Property doesn't exist, create a new one
new_property = PropertyModel(
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,
has_recommendations=False
)
except NoResultFound:
# Property doesn't exist, create a new one
new_property = PropertyModel(
address=address,
postcode=postcode,
portfolio_id=portfolio_id,
creation_status=PropertyCreationStatus.LOADING,
status=PortfolioStatus.ASSESSMENT.value,
has_pre_condition_report=False,
has_recommendations=False
)
# Add the new property to the session
session.add(new_property)
# Add the new property to the session
session.add(new_property)
session.commit()
session.flush()
return new_property.id, True
return new_property.id, True
def create_property_targets(property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None):
def create_property_targets(session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None):
"""
This function will create a record for the property targets in the database if it does not exist.
:param session: The database session
:param property_id: The ID of the property the targets belong to
:param portfolio_id: The ID of the portfolio the property belongs to
:param epc_target: Goal EPC value for the property
:param heat_demand_target: Heat demand target for the property in kwh/m^2/year
:return:
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
new_target = PropertyTargetsModel(
property_id=property_id,
portfolio_id=portfolio_id,
epc=epc_target,
heat_demand=heat_demand_target
)
session.add(new_target)
session.commit()
new_target = PropertyTargetsModel(
property_id=property_id,
portfolio_id=portfolio_id,
epc=epc_target,
heat_demand=heat_demand_target
)
session.add(new_target)
session.flush()
return True
def update_property_data(property_id: int, portfolio_id: int, property_data: dict):
Session = sessionmaker(bind=db_engine)
def update_property_data(session, property_id: int, portfolio_id: int, property_data: dict):
now = datetime.datetime.now(pytz.utc)
with Session() as session:
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
id=property_id, portfolio_id=portfolio_id
).one()
# Update the fields with the data in property_data
for key, value in property_data.items():
setattr(existing_property, key, value)
try:
# Attempt to fetch the existing property
existing_property = session.query(PropertyModel).filter_by(
id=property_id, portfolio_id=portfolio_id
).one()
existing_property.updated_at = now
# Update the fields with the data in property_data
for key, value in property_data.items():
setattr(existing_property, key, value)
# Merge the updated property back into the session and commit
session.merge(existing_property)
session.commit()
existing_property.updated_at = now
except NoResultFound:
raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found")
# Merge the updated property back into the session and flush
session.merge(existing_property)
session.flush()
except NoResultFound:
raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found")
return True
def create_property_details_epc(property_details_epc: dict):
def create_property_details_epc(session, property_details_epc: dict):
"""
This function will create or update a record for the property details EPC in the database.
:param session: The database session
:param property_details_epc: A dictionary containing details about the property EPC.
:return: True if successful, False otherwise.
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
existing_record = session.query(PropertyDetailsEpcModel).filter_by(
portfolio_id=property_details_epc["portfolio_id"],
property_id=property_details_epc["property_id"]
).first()
if existing_record:
# If the record exists, update its fields
for key, value in property_details_epc.items():
setattr(existing_record, key, value)
else:
# If the record doesn't exist, create a new one
new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc)
session.add(new_property_details_epc)
existing_record = session.query(PropertyDetailsEpcModel).filter_by(
portfolio_id=property_details_epc["portfolio_id"],
property_id=property_details_epc["property_id"]
).first()
session.commit()
if existing_record:
# If the record exists, update its fields
for key, value in property_details_epc.items():
setattr(existing_record, key, value)
else:
# If the record doesn't exist, create a new one
new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc)
session.add(new_property_details_epc)
session.flush()
return True

View file

@ -1,73 +1,112 @@
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Plan, Recommendation, RecommendationMaterials
from sqlalchemy import insert
from backend.app.db.models.recommendations import Plan, Recommendation, RecommendationMaterials, PlanRecommendations
def create_plan(plan):
def create_plan(session, plan):
"""
This function will create a record for the plan in the database if it does not exist.
:param plan: dictionary of data representing a plan to be created
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
new_plan = Plan(**plan)
session.add(new_plan)
session.commit()
new_plan = Plan(**plan)
session.add(new_plan)
session.flush()
return new_plan.id
return new_plan.id
def create_recommendation(recommendation):
def create_recommendation(session, recommendation):
"""
This function will create a record for the recommendation in the database if it does not exist.
:param session: The database session
:param recommendation: dictionary of data representing a recommendation to be created
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.commit()
new_recommendation = Recommendation(**recommendation)
session.add(new_recommendation)
session.flush()
return new_recommendation.id
return new_recommendation.id
def create_recommendation_material(recommendation_id, material_id, depth):
def create_recommendation_material(session, recommendation_id, material_id, depth):
"""
This function will create a record for the recommendation_material in the database if it does not exist.
:param session: The databse session
:param recommendation_id: ID of the recommendation
:param material_id: ID of the material
:param depth: depth of the material, may be null if a material where depth is not applicable
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id,
material_id=material_id,
depth=depth
)
session.add(new_recommendation_material)
session.commit()
new_recommendation_material = RecommendationMaterials(
recommendation_id=recommendation_id,
material_id=material_id,
depth=depth
)
session.add(new_recommendation_material)
session.flush()
return new_recommendation_material.id
return new_recommendation_material.id
def create_plan_recommendations(plan_id, recommendation_ids):
def create_plan_recommendations(session, plan_id, recommendation_ids):
"""
This function will create a record for the plan_recommendation in the database if it does not exist.
This function will create records for the plan_recommendation in the database.
:param plan_id: ID of the plan
:param recommendation_ids: list of recommendation IDs
"""
Session = sessionmaker(bind=db_engine)
with Session() as session:
for recommendation_id in recommendation_ids:
session.execute(
text(
'INSERT INTO plan_recommendations (plan_id, recommendation_id) VALUES (:plan_id, '
':recommendation_id)'),
{'plan_id': plan_id, 'recommendation_id': recommendation_id}
)
session.commit()
# Prepare a list of dictionaries for bulk insert
data = [{"plan_id": plan_id, "recommendation_id": rid} for rid in recommendation_ids]
# Bulk insert using SQLAlchemy's core API
session.execute(insert(PlanRecommendations).values(data))
def upload_recommendations(session, recommendations_to_upload, property_id):
# Prepare data for bulk insert for Recommendation
recommendations_data = [
{
"property_id": property_id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["cost"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
"sap_points": rec["sap_points"]
}
for rec in recommendations_to_upload
]
session.bulk_insert_mappings(Recommendation, recommendations_data)
# To get the IDs of the newly inserted recommendations, we need to flush the session
session.flush()
# Map the uploaded_recommendation_ids with the original data for reference
uploaded_recommendation_ids = [rec.id for rec in session.query(Recommendation).filter(
Recommendation.property_id == property_id,
Recommendation.description.in_([rec["description"] for rec in recommendations_to_upload])
)]
# Prepare data for bulk insert for RecommendationMaterials
recommendation_materials_data = [
{
"recommendation_id": recommendation_id,
"material_id": part["id"],
"depth": part["depths"][0] if part["depths"] else None,
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
}
for rec, recommendation_id in zip(recommendations_to_upload, uploaded_recommendation_ids)
for part in rec["parts"]
]
session.bulk_insert_mappings(RecommendationMaterials, recommendation_materials_data)
# flush the changes to get the newly created IDs
session.flush()
return uploaded_recommendation_ids

View file

@ -1,8 +1,9 @@
from sqlalchemy import Column, BigInteger, String, Float, Boolean, TIMESTAMP, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, BigInteger, String, Float, Boolean, TIMESTAMP, ForeignKey, Enum
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func
from backend.app.db.models.portfolio import Portfolio, PropertyModel
from backend.app.db.models.materials import Material
from datatypes.enums import QuantityUnits
Base = declarative_base()
@ -37,6 +38,9 @@ class RecommendationMaterials(Base):
material_id = Column(BigInteger, ForeignKey(Material.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
depth = Column(Float, nullable=False)
quantity = Column(Float, nullable=False)
quantity_unit = Column(Enum(QuantityUnits, values_callable=lambda x: [e.value for e in x]), nullable=False)
estimated_cost = Column(Float, nullable=False)
class Plan(Base):

View file

@ -13,6 +13,8 @@ from recommendations.WallRecommendations import WallRecommendations
from utils.uvalue_estimates import classify_decile_newvalues
from backend.app.db.utils import row2dict
from starlette.responses import Response
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
# database interaction functions
from backend.app.db.functions.property_functions import (
@ -20,13 +22,15 @@ from backend.app.db.functions.property_functions import (
)
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.recommendations_functions import (
create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations
create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations,
upload_recommendations
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from model_data.utils import epc_to_sap_lower_bound
from backend.app.utils import epc_to_sap_lower_bound
from model_data.optimiser.optimiser_functions import prepare_input_measures
# TODO: This is placeholder until data is stored in DB
@ -111,7 +115,7 @@ def insert_temp_recommendation_id(property_recommendations):
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by types
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
@ -127,258 +131,266 @@ def insert_temp_recommendation_id(property_recommendations):
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
# Read in the trigger file from s3
bucket_name = get_settings().PLAN_TRIGGER_BUCKET
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
logger.info("Connecting to db")
Session = sessionmaker(bind=db_engine)
session = Session()
plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path)
try:
session.begin()
logger.info("Getting the inputs")
# Read in the trigger file from s3
bucket_name = get_settings().PLAN_TRIGGER_BUCKET
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
plan_input = read_csv_from_s3(bucket_name=bucket_name, filepath=body.trigger_file_path)
# Create a record in db
property_id, is_new = create_property(
portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
# TODO: Need to add heat demand target
create_property_targets(
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
logger.info("Getting coordinates")
# This is placeholder, until the full dataset is loaded into the database
for p in input_properties:
coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0]
p.set_coordinates(coordinate_data)
logger.info("Check if property is in conservation area")
for p in input_properties:
in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get(
"is_in_conservation_area"
)
p.set_is_in_conservation_area(in_conservation_area)
# The materials data could be cached or local so we don't need to make
# consistent requrests to the backend for
# the same data
# TODO: It might not be the best choice to store the materials data in a database table since thi
# table probably won't be very large and won't be updated that often. It might be better to
# store this data in s3 load it into memory when the app starts up. We will test this
materials = get_materials()
materials_by_type = filter_materials(materials)
logger.info("Getting components and properties recommendations")
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
for p in input_properties:
property_recommendations = []
# For each property, classiy floor area decide
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=floors_decile_data["decile_boundaries"],
decile_labels=floors_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# Property recommendations
p.get_components(cleaned)
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data["floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"],
)
floor_recommender.recommend()
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
# Wall recommendations
# We would make this u-value query directly to the database
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=walls_decile_data["decile_boundaries"],
decile_labels=walls_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
walls_u_value_estimate = [
x for x in uvalue_estimates_walls
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data["walls-energy-eff"] != 'N/A' else True) &
(x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True)
]
wall_recomender = WallRecommendations(
property_instance=p,
uvalue_estimates=walls_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"]
)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# Use the optimiser to pick the default recommendations and decide if we need certain
# recommendations to get to the goal
property_recommendations = insert_temp_recommendation_id(property_recommendations)
if not property_recommendations:
continue
input_measures = prepare_input_measures(property_recommendations, body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(p.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures, min_gain=target_sap_points - current_sap_points
# Create a record in db
property_id, is_new = create_property(
session, portfolio_id=body.portfolio_id, address=config['address'], postcode=config['postcode']
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
# if a new record was not created, we don't produduce recommendations
if not is_new:
continue
selected_recommendations = {r["id"] for r in solution}
# We'll use the set of selected recommendations to filter the recommendations to upload
# TODO: Need to add heat demand target
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
property_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
input_properties.append(
Property(
postcode=config['postcode'],
address1=config['address'],
epc_client=epc_client,
id=property_id
)
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
logger.info("Getting coordinates")
# This is placeholder, until the full dataset is loaded into the database
for p in input_properties:
coordinate_data = [x for x in open_uprn_data if x['UPRN'] == int(p.data['uprn'])][0]
p.set_coordinates(coordinate_data)
logger.info("Check if property is in conservation area")
for p in input_properties:
in_conservation_area = [x for x in in_conservation_area_data if x['uprn'] == int(p.data['uprn'])][0].get(
"is_in_conservation_area"
)
p.set_is_in_conservation_area(in_conservation_area)
# The materials data could be cached or local so we don't need to make
# consistent requrests to the backend for
# the same data
# TODO: It might not be the best choice to store the materials data in a database table since thi
# table probably won't be very large and won't be updated that often. It might be better to
# store this data in s3 load it into memory when the app starts up. We will test this
materials = get_materials(session)
materials_by_type = filter_materials(materials)
logger.info("Getting components and properties recommendations")
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
for p in input_properties:
property_recommendations = []
# For each property, classiy floor area decide
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=floors_decile_data["decile_boundaries"],
decile_labels=floors_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# Property recommendations
p.get_components(cleaned)
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data[
"floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
for recommendations_by_type in property_recommendations
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
property_recommendations = [
rec for recommendations_by_type in property_recommendations for rec in recommendations_by_type
]
recommendations[p.id] = property_recommendations
# Once we're done, we'll store:
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id, rating_lookup=rating_lookup)
create_property_details_epc(property_details_epc)
property_data = p.get_full_property_data()
update_property_data(property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Create a plan
new_plan_id = create_plan(
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
# upload recommendations
uploaded_recommendation_ids = []
for rec in recommendations_to_upload:
recommendation_id = create_recommendation(
{
"property_id": p.id,
"type": rec["type"],
"description": rec["description"],
"estimated_cost": rec["cost"],
"default": rec["default"],
"starting_u_value": rec.get("starting_u_value"),
"new_u_value": rec.get("new_u_value"),
# TODO: Placeholder for SAP points in place
"sap_points": rec["sap_points"]
# Remaining outputs yet to be handled
}
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"],
)
uploaded_recommendation_ids.append(recommendation_id)
floor_recommender.recommend()
# create the bridging between the recommendation and the materials
for part in rec["parts"]:
create_recommendation_material(
recommendation_id=recommendation_id,
material_id=part["id"],
depth=part["depths"][0] if part["depths"] else None,
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
# Wall recommendations
# We would make this u-value query directly to the database
total_floor_area_group_decile = classify_decile_newvalues(
decile_boundaries=walls_decile_data["decile_boundaries"],
decile_labels=walls_decile_data["decile_labels"],
new_values=[float(p.data["total-floor-area"])],
)[0]
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
walls_u_value_estimate = [
x for x in uvalue_estimates_walls
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['walls-energy-eff'] == p.data["walls-energy-eff"] if p.data[
"walls-energy-eff"] != 'N/A' else True) &
(x['walls-env-eff'] == p.data["walls-env-eff"] if p.data["walls-env-eff"] != 'N/A' else True)
]
wall_recomender = WallRecommendations(
property_instance=p,
uvalue_estimates=walls_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["external_wall_insulation"] + materials_by_type["internal_wall_insulation"]
)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# Use the optimiser to pick the default recommendations and decide if we need certain
# recommendations to get to the goal
property_recommendations = insert_temp_recommendation_id(property_recommendations)
if not property_recommendations:
continue
input_measures = prepare_input_measures(property_recommendations, body.goal)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(p.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures, min_gain=target_sap_points - current_sap_points
)
# Finally, match the recommendation to the plan
create_plan_recommendations(
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
aggregate_portfolio_recommendations(portfolio_id=body.portfolio_id)
selected_recommendations = {r["id"] for r in solution}
# We'll use the set of selected recommendations to filter the recommendations to upload
property_recommendations = [
[
{**rec, "default": True if rec["recommendation_id"] in selected_recommendations else False}
for rec in recommendations_by_type
]
for recommendations_by_type in property_recommendations
]
# We'll also unlist the recommendations so they're a bit easier to handle from here onwards
property_recommendations = [
rec for recommendations_by_type in property_recommendations for rec in recommendations_by_type
]
recommendations[p.id] = property_recommendations
# Once we're done, we'll store:
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# Upload property data
for p in input_properties:
property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id,
rating_lookup=rating_lookup)
create_property_details_epc(session, property_details_epc)
property_data = p.get_full_property_data()
update_property_data(session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data)
# Upload recommendations
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
# Create a plan
new_plan_id = create_plan(
session,
{
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
}
)
# Upload recommendations
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
# Finally, match the recommendation to the plan
create_plan_recommendations(
session,
plan_id=new_plan_id,
recommendation_ids=uploaded_recommendation_ids
)
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
aggregate_portfolio_recommendations(session, portfolio_id=body.portfolio_id)
# Commit all changes at once
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)

View file

@ -65,3 +65,55 @@ def generate_api_key():
# Generate a 40 character long api key
api_key = ''.join(secrets.choice(characters) for _ in range(40))
return api_key
def sap_to_epc(sap_points: int):
"""
Simple utility function to convert SAP points to EPC rating.
:param sapPoints: numerical value of SAP points, typically between 0 and 100
:return:
"""
if sap_points <= 0 or sap_points > 100:
raise ValueError("SAP points should be between 1 and 100.")
if sap_points > 91:
return "A"
elif sap_points > 80:
return "B"
elif sap_points > 69:
return "C"
elif sap_points > 55:
return "D"
elif sap_points > 39:
return "E"
elif sap_points > 21:
return "F"
else:
return "G"
def epc_to_sap_lower_bound(epc: str):
"""
Given an EPC rating, returns the lower bound SAP score required
to hit that EPC rating
:param epc: EPC rating, between A and G
:return:
"""
if epc == "A":
return 92
elif epc == "B":
return 81
elif epc == "C":
return 70
elif epc == "D":
return 56
elif epc == "E":
return 40
elif epc == "F":
return 22
elif epc == "G":
return 1
else:
raise ValueError("EPC rating should be between A and G")

View file

@ -1,26 +1,51 @@
# Pull base image
FROM python:3.10.12-slim-buster
FROM python:3.10.12-slim-buster as build-image
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Set work directory to the root of your project
WORKDIR /Model
WORKDIR var/task/Model
# Install system dependencies
RUN apt-get update && apt-get install -y netcat-openbsd
RUN #apt-get update && apt-get install -y netcat-openbsd
# Install python dependencies
COPY ./backend/requirements/base.txt ./backend/requirements/base.txt
COPY ./model_data/requirements/requirements.txt ./model_data/requirements/requirements.txt
RUN pip install --upgrade pip
RUN pip install -r backend/requirements/base.txt
RUN pip install -r model_data/requirements/requirements.txt
# Install and clean up temp caches
RUN pip install -r backend/requirements/base.txt && rm -rf /root/.cache
# Copy project
COPY ./backend ./backend
COPY ./model_data ./model_data
# Since we are not using a base AWS image, there is some additional setup required. We need to set up the runtime
# interface client
# https://docs.aws.amazon.com/lambda/latest/dg/python-image.html#python-image-clients
# Additionally install the AWS Lambda RIC
RUN pip install awslambdaric
# command to run on container start
CMD ["uvicorn", "backend.app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Second stage: "runtime-image"
FROM python:3.10.12-slim-buster
# Set work directory to the root of your project
WORKDIR /var/task/Model
# Copy the python dependencies from the build-image
COPY --from=build-image /usr/local/lib/python3.10/site-packages/ /usr/local/lib/python3.10/site-packages/
# Copy project files
COPY ./backend/ ./backend
COPY ./recommendations/ ./recommendations
COPY ./model_data/BaseUtility.py ./model_data/BaseUtility.py
COPY ./model_data/config.py ./model_data/config.py
COPY ./model_data/optimiser/ ./model_data/optimiser/
COPY ./model_data/__init__.py ./model_data/__init__.py
COPY ./model_data/EpcClean.py ./model_data/EpcClean.py
COPY ./model_data/utils.py ./model_data/utils.py
COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/
COPY ./datatypes/ ./datatypes/
COPY ./utils/ ./utils/
# Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
# Define the handler location
CMD ["backend.app.main.handler"]

View file

@ -1,12 +1,51 @@
# Pull base image
FROM python:3.10.12-slim-buster
FROM python:3.10.12-slim-buster as build-image
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Set work directory to the root of your project
WORKDIR /Model
WORKDIR var/task/Model
# Install system dependencies
RUN apt-get update && apt-get install -y netcat-openbsd
#RUN apt-get update && apt-get install -y netcat-openbsd
# Install python dependencies
COPY ./backend/requirements/base.txt ./backend/requirements/base.txt
# Install and clean up temp caches
RUN pip install --upgrade pip \
&& pip install -r backend/requirements/base.txt && rm -rf /root/.cache
# Since we are not using a base AWS image, there is some additional setup required. We need to set up the runtime
# interface client
# https://docs.aws.amazon.com/lambda/latest/dg/python-image.html#python-image-clients
# Additionally install the AWS Lambda RIC
RUN pip install awslambdaric
# Second stage: "runtime-image"
FROM python:3.10.12-slim-buster
# Set work directory to the root of your project
WORKDIR /var/task/Model
# Copy the python dependencies from the build-image
COPY --from=build-image /usr/local/lib/python3.10/site-packages/ /usr/local/lib/python3.10/site-packages/
# Copy project files
COPY ./backend/ ./backend
COPY ./recommendations/ ./recommendations
COPY ./model_data/BaseUtility.py ./model_data/BaseUtility.py
COPY ./model_data/config.py ./model_data/config.py
COPY ./model_data/optimiser/ ./model_data/optimiser/
COPY ./model_data/__init__.py ./model_data/__init__.py
COPY ./model_data/EpcClean.py ./model_data/EpcClean.py
COPY ./model_data/utils.py ./model_data/utils.py
COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/
COPY ./datatypes/ ./datatypes/
COPY ./utils/ ./utils/
# Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
# Define the handler location
CMD ["backend.app.main.handler"]

View file

@ -30,3 +30,5 @@ websockets==11.0.3
sqlalchemy==2.0.19
psycopg2-binary
pytz==2023.3
mip==1.15.0
boto3==1.28.3

5
datatypes/enums.py Normal file
View file

@ -0,0 +1,5 @@
import enum
class QuantityUnits(enum.Enum):
m2 = "m2"

View file

@ -98,4 +98,10 @@ module "route53" {
providers = {
aws.aws_use1 = aws.aws_use1
}
}
}
# Create an ECR repository for storage of the lambda's docker images
module "ecr" {
source = "./modules/ecr"
environment = var.stage
}

View file

@ -0,0 +1,29 @@
resource "aws_ecr_repository" "my_repository" {
name = "fastapi-repository-${var.environment}"
image_tag_mutability = "MUTABLE" # Allows overwriting image tags, change to IMMUTABLE if you want to prevent overwriting
image_scanning_configuration {
scan_on_push = true
}
}
resource "aws_ecr_lifecycle_policy" "my_repository_policy" {
repository = aws_ecr_repository.my_repository.name
policy = jsonencode({
rules = [
{
rulePriority = 1
description = "Retain only the last 10 images"
selection = {
tagStatus = "any"
countType = "imageCountMoreThan"
countNumber = 10
}
action = {
type = "expire"
}
}
]
})
}

View file

@ -0,0 +1,4 @@
output "ecr_repository_name" {
description = "Name of the EPR repo in AWS"
value = aws_ecr_repository.my_repository.name
}

View file

@ -0,0 +1,4 @@
variable "environment" {
description = "The environment for the ECR repository (dev or prod)"
type = string
}

View file

@ -1,4 +1,4 @@
class BaseUtility:
class Definitions:
"""
This class contains some base attributes which are used across multiple other classes
"""
@ -38,7 +38,7 @@ class BaseUtility:
# addresses will take time to develop to deal with these and future anomalies.
#
# There are several fields within the lodged data where it is possible to enter multiple entries to cater for
# different types of build within a single property, i.e. extensions. This results in multiple entries for
# different data_types of build within a single property, i.e. extensions. This results in multiple entries for
# the description fields for floor, roof and wall. For the purposes of this data release only the information
# contained within the first of these multiple entries is being provided. As there are no restrictions on the
# value in this first field it means that sometimes the first field in a multiple entry description field may

View file

@ -22,7 +22,7 @@ LAND_REGISTRY_PATHS = [
def app():
"""
For a pre-defined list of constituencies and property types, we'll download EPC data from the API
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
sanitise any description data
:return:

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_thermal_transmittance, extract_component_types
class FloorAttributes(BaseUtility):
class FloorAttributes(Definitions):
DWELLING_BELOW = ["another dwelling below", "other premises below"]
FLOOR_TYPES = ["assumed", "to unheated space", "to external air", "suspended", "solid"]

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, find_keyword
class HotWaterAttributes(BaseUtility):
class HotWaterAttributes(Definitions):
# HEATER_TYPES refer to the main devices used for heating water. These devices can be powered by different energy
# sources.
HEATER_TYPES = [

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, remove_punctuation, find_keyword
class MainFuelAttributes(BaseUtility):
class MainFuelAttributes(Definitions):
FUEL_KEYWORDS = [
'heat network',
'mains gas',
@ -96,7 +96,7 @@ class MainFuelAttributes(BaseUtility):
if not result["fuel_type"]:
result["fuel_type"] = self.UNKNOWN_FUEL
# We'll do checks on unknown fuel types to ensure we don't miss anything
# We'll do checks on unknown fuel data_types to ensure we don't miss anything
self.is_unknown = True
return result

View file

@ -1,9 +1,9 @@
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, process_part
from typing import Dict, Union
class MainHeatAttributes(BaseUtility):
class MainHeatAttributes(Definitions):
HEAT_SYSTEMS = [
"boiler", "air source heat pump", "room heaters", "electric storage heaters", "warm air",
"electric underfloor heating", "electric ceiling heating", "community scheme",

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description, find_keyword
class MainheatControlAttributes(BaseUtility):
class MainheatControlAttributes(Definitions):
# These systems allow for the automatic regulation of temperature
THERMOSTATIC_CONTROL_KEYWORDS = [
'room thermostats',

View file

@ -1,10 +1,10 @@
import re
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_component_types, extract_thermal_transmittance
class RoofAttributes(BaseUtility):
class RoofAttributes(Definitions):
ROOF_TYPES = ['pitched', 'roof room', 'loft', 'flat', 'thatched', 'at rafters', 'assumed']
DWELLING_ABOVE = ["another dwelling above", "other premises above"]

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import extract_component_types, extract_thermal_transmittance
class WallAttributes(BaseUtility):
class WallAttributes(Definitions):
WALL_TYPES = ['cavity wall', 'filled cavity', 'solid brick', 'system built', 'timber frame', 'granite or whinstone',
'as built', 'cob', 'assumed', 'sandstone or limestone']

View file

@ -1,9 +1,9 @@
from typing import Dict, Union
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from model_data.epc_attributes.attribute_utils import clean_description
class WindowAttributes(BaseUtility):
class WindowAttributes(Definitions):
GLAZING_KEYWORDS = ["glazing", "glazed", "glaze"]
GLAZING_COVERAGE = ["fully", "mostly", "partial", "some", "full", "thoughout"]
GLAZING_TYPES = ["double", "triple", "secondary", "multiple", "high performance", "single"]

View file

@ -36,13 +36,13 @@ def extract_component_types(result: dict, description: str, list_of_components:
Dict[str, Union[None, str, float]], str
]:
"""
Extracts component types from the description, updates the result dictionary, and removes the matched component
types from the description.
Extracts component data_types from the description, updates the result dictionary, and removes the matched component
data_types from the description.
:param result: Dictionary to store the results in.
:param description: Lowercase description string.
:param list_of_components: List of component types to extract from the description.
:return: A tuple containing the updated result dictionary and the description with the matched component types
:param list_of_components: List of component data_types to extract from the description.
:return: A tuple containing the updated result dictionary and the description with the matched component data_types
removed.
"""
for component in list_of_components:

View file

@ -1,7 +1,7 @@
from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from simulation_system.core.Settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
@ -12,7 +12,7 @@ from simulation_system.core.Settings import (
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON
)
)
from typing import List
@ -43,11 +43,11 @@ class DataProcessor:
if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
return self.data
def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON):
"""
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
@ -56,35 +56,33 @@ class DataProcessor:
# The groupby changes the order and we use the index to make the original data
filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply(
lambda group: group.fillna(method='bfill').fillna(method='ffill')
).reset_index().set_index('level_1').sort_index()
).reset_index().set_index('level_1').sort_index()
self.data[columns_to_fill] = filled_data[columns_to_fill]
self.data[columns_to_fill] = filled_data[columns_to_fill]
def remap_columns(self):
"""
Remap all columns, for any non values
"""
# Map all anomaly values to None
data_anomaly_map = dict(zip(BaseUtility.DATA_ANOMALY_MATCHES, [None]*len(BaseUtility.DATA_ANOMALY_MATCHES)))
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP)
self.data = data
def make_cleaning_averages(self) -> pd.DataFrame:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True,
@ -93,41 +91,58 @@ class DataProcessor:
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply(
median_without_missing).reset_index()
built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
# We can clean up any NA's in the cleaning averages with the general averages here
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'], suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'], suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'], suffixes=['', '_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'],
suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'],
suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'],
suffixes=['', '_BUILT_FORM_AVERAGE'])
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope and built form
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
# If there still is na values, use average across all properties in consituecy
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(cleaning_averages_filled['FLOOR_HEIGHT'].mean())
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT'].mean())
# If the consituency is all NA values, then take UK AVERAGE VALUES
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
@ -143,7 +158,6 @@ class DataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
def recast_df_columns(self, column_mappings: dict) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
@ -156,7 +170,6 @@ class DataProcessor:
for value in values:
self.data[key] = self.data[key].astype(value)
def confine_data(self) -> None:
"""
Include all step to reduce down the data based on assumptions
@ -177,12 +190,11 @@ class DataProcessor:
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (
self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100

View file

@ -1,15 +1,15 @@
from pathlib import Path
from core.Settings import (
RDSAP_RESPONSE,
FLOOR_LEVEL_MAP,
RDSAP_RESPONSE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
EARLIEST_EPC_DATE,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,
FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES
)
from model_data.BaseUtility import BaseUtility
)
from model_data.BaseUtility import Definitions
from tqdm import tqdm
import pandas as pd
import numpy as np
@ -21,17 +21,18 @@ RANDOM_SEED = 0
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
FLOAT_COLUMNS = [
'NUMBER_OPEN_FIREPLACES',
'EXTENSION_COUNT',
'TOTAL_FLOOR_AREA',
'PHOTO_SUPPLY',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'NUMBER_HABITABLE_ROOMS',
'LOW_ENERGY_LIGHTING',
'MULTI_GLAZE_PROPORTION',
'NUMBER_HEATED_ROOMS'
]
'NUMBER_OPEN_FIREPLACES',
'EXTENSION_COUNT',
'TOTAL_FLOOR_AREA',
'PHOTO_SUPPLY',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'NUMBER_HABITABLE_ROOMS',
'LOW_ENERGY_LIGHTING',
'MULTI_GLAZE_PROPORTION',
'NUMBER_HEATED_ROOMS'
]
def create_raw_data():
"""
@ -40,7 +41,7 @@ def create_raw_data():
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[0:10]
dfs = []
dfs = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
@ -52,7 +53,8 @@ def create_raw_data():
df = df[~df["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
# Change multi glaze proportion
no_multi_glaze_proportion_index = pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (df["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
no_multi_glaze_proportion_index = pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (
df["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
df.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
# Recast
@ -63,12 +65,12 @@ def create_raw_data():
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Map all anomaly values to None
data_anomaly_map = dict(zip(BaseUtility.DATA_ANOMALY_MATCHES, [None]*len(BaseUtility.DATA_ANOMALY_MATCHES)))
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
df = df.replace(data_anomaly_map)
df = df.replace(np.NAN, None)
# Remap certain columns
df['FLOOR_LEVEL'] = df['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
df['BUILT_FROM'] = df['BUILT_FORM'].replace(BUILT_FORM_REMAP)
@ -83,7 +85,6 @@ def create_raw_data():
df[RDSAP_RESPONSE] = pd.to_numeric(df[RDSAP_RESPONSE], downcast='unsigned')
df[FLOAT_COLUMNS] = df[FLOAT_COLUMNS].apply(pd.to_numeric, downcast='float')
dfs.append(df)
data = pd.concat(dfs)
@ -95,23 +96,23 @@ def create_raw_data():
def main():
data = TabularDataset(data='./model_build_data/energy_data/cleaned_data/train_validation_data.parquet')
subsample_size = round(len(data)/100)
subsample_size = round(len(data) / 100)
data = data.sample(subsample_size, random_state=RANDOM_SEED)
predictor_RDSAP = TabularPredictor(
label=RDSAP_RESPONSE,
path="agModels-predictENERGY",
label=RDSAP_RESPONSE,
path="agModels-predictENERGY",
problem_type="regression",
eval_metric='mean_absolute_error'
).fit(data, time_limit=800, presets='high_quality', excluded_model_types=['KNN', 'CAT'])
).fit(data, time_limit=800, presets='high_quality', excluded_model_types=['KNN', 'CAT'])
test_data = TabularDataset('./model_build_data/energy_data/cleaned_data/test_data.parquet')
performance = predictor_RDSAP.evaluate(test_data)
predictions = predictor_RDSAP.predict(test_data)
predictor_RDSAP.feature_importance(test_data)
if __name__ == "__main__":
main()
main()

View file

@ -1,12 +1,13 @@
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from core.Settings import (
MANDATORY_FIXED_FEATURES,
AVERAGE_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
AVERAGE_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON
@ -15,6 +16,7 @@ from core.DataProcessor import DataProcessor
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
# TODO: Have a look at temporal features
def app():
@ -29,8 +31,9 @@ def app():
dataset = []
# 116
# 128048706
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic-certificates/domestic-E09000021-Kingston-upon-Thames')
for directory in tqdm(directories):
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic
# -certificates/domestic-E09000021-Kingston-upon-Thames')
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
@ -44,7 +47,7 @@ def app():
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1:
continue
@ -60,26 +63,31 @@ def app():
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean()
modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on, suffixes=['', '_AVERAGE'])
modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna(modified_property_data['TOTAL_FLOOR_AREA_AVERAGE'])
modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna(modified_property_data['FLOOR_HEIGHT_AVERAGE'])
modified_property_data = modified_property_data.drop(columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[
['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean()
modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on,
suffixes=['', '_AVERAGE'])
modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna(
modified_property_data['TOTAL_FLOOR_AREA_AVERAGE'])
modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna(
modified_property_data['FLOOR_HEIGHT_AVERAGE'])
modified_property_data = modified_property_data.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
for field in AVERAGE_FIXED_FEATURES:
vals = list(modified_property_data[field].dropna().unique())
vals = list(modified_property_data[field].dropna().unique())
if len(vals) > 1:
# Check the values are too far apart
# TODO: we could have multiple values here, why only use the first two?
if abs(vals[0] - vals[1]) / vals[0] > 0.1:
# Take the more recent value since it's likely to be more accurate
vals = [vals[-1]]
fixed_data[field] = np.mean(vals)
#Combine all fields together
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
@ -128,4 +136,4 @@ def app():
if __name__ == "__main__":
app()
app()

View file

@ -36,7 +36,7 @@ class TestCleanFloor:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known floor types or thermal transmittance",
"description with no known floor data_types or thermal transmittance",
]
for description in invalid_descriptions:

View file

@ -29,7 +29,7 @@ class TestHotWaterAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known hotwater types",
"description with no known hotwater data_types",
""
]

View file

@ -29,7 +29,7 @@ class TestMainHeatControlAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known fuel types",
"description with no known fuel data_types",
]
for description in invalid_descriptions:

View file

@ -34,7 +34,7 @@ class TestMainHeatAttributes:
invalid_descriptions = [
"",
"invalid description",
"description with no known heating types",
"description with no known heating data_types",
]
for description in invalid_descriptions:

View file

@ -29,7 +29,7 @@ class TestMainHeatControlAttributes:
# Test that invalid descriptions raise a ValueError
invalid_descriptions = [
"invalid description",
"description with no known heating control types",
"description with no known heating control data_types",
]
for description in invalid_descriptions:

View file

@ -24,57 +24,3 @@ def correct_spelling(text):
corrected_text = ' '.join(corrected_words)
return corrected_text
def sap_to_epc(sap_points: int):
"""
Simple utility function to convert SAP points to EPC rating.
:param sapPoints: numerical value of SAP points, typically between 0 and 100
:return:
"""
if sap_points <= 0 or sap_points > 100:
raise ValueError("SAP points should be between 1 and 100.")
if sap_points > 91:
return "A"
elif sap_points > 80:
return "B"
elif sap_points > 69:
return "C"
elif sap_points > 55:
return "D"
elif sap_points > 39:
return "E"
elif sap_points > 21:
return "F"
else:
return "G"
def epc_to_sap_lower_bound(epc: str):
"""
Given an EPC rating, returns the lower bound SAP score required
to hit that EPC rating
:param epc: EPC rating, between A and G
:return:
"""
if epc == "A":
return 92
elif epc == "B":
return 81
elif epc == "C":
return 70
elif epc == "D":
return 56
elif epc == "E":
return 40
elif epc == "F":
return 22
elif epc == "G":
return 1
else:
raise ValueError("EPC rating should be between A and G")

View file

@ -1,6 +1,7 @@
import math
from typing import List
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from datatypes.enums import QuantityUnits
from backend.Property import Property
from recommendations.rdsap_tables import default_wall_thickness, age_band_data
from recommendations.recommendation_utils import (
@ -13,7 +14,7 @@ suspended_floor_insulation_parts = [
# Example product
# https://www.insulationsuperstore.co.uk/product/recticel-eurothane-general-purpose-pir-insulation-board-2400
# -x-1200-x-100mm.html
# All product types here:
# All product data_types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/recticel/filterby/application/floors.html
"type": "suspended_floor_insulation",
"description": "Rigid Insulation Foam Boards",
@ -29,7 +30,7 @@ suspended_floor_insulation_parts = [
{
# Example product
# https://www.insulationsuperstore.co.uk/product/rockwool-rwa45-acoustic-insulation-slab-100mm-2-88m2-pack.html
# All product types here:
# All product data_types here:
# https://www.insulationsuperstore.co.uk/browse/insulation/brand/rockwool/filterby/application/floors
# /material/mineral-wool.html
"type": "suspended_floor_insulation",
@ -49,7 +50,7 @@ solid_floor_insulation_parts = [
{
# Example product
# https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation/k103-100mm
# All product types here:
# All product data_types here:
# https://www.insulationexpress.co.uk/floor-insulation/solid-floor-insulation?brand=7015&p=1
# Example screed https://www.screwfix.com/p/mapei-ultraplan-3240-self-levelling-compound-25kg/4959f
"type": "solid_floor_insulation",
@ -69,7 +70,7 @@ solid_floor_insulation_parts = [
parts = suspended_floor_insulation_parts + solid_floor_insulation_parts
class FloorRecommendations(BaseUtility):
class FloorRecommendations(Definitions):
# part L building regulations indicate that any rennovations on an existing property's walls should
# achieve a U-value of no higher than 0.3
BUILDING_REGULATIONS_PART_L_MAX_U_VALUE = 0.25
@ -305,17 +306,25 @@ class FloorRecommendations(BaseUtility):
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
estimated_cost = cost_per_unit * self.property.floor_area
self.recommendations.append(
{
"parts": [
get_recommended_part(part, depth, cost_per_unit),
get_recommended_part(
part=part,
selected_depth=depth,
quantity=self.property.floor_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=estimated_cost
),
],
"type": "floor_insulation",
"description": self._make_floor_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"cost": cost_per_unit * self.property.floor_area,
"cost": estimated_cost,
}
)

View file

@ -1,8 +1,9 @@
import itertools
import math
from datatypes.enums import QuantityUnits
from backend.Property import Property
from model_data.BaseUtility import BaseUtility
from model_data.BaseUtility import Definitions
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_uvalue_estimate, estimate_sap_points
@ -184,7 +185,7 @@ internal_wall_insulation_parts = [
wall_parts = external_wall_insulation_parts + internal_wall_insulation_parts
class WallRecommendations(BaseUtility):
class WallRecommendations(Definitions):
YEAR_WALLS_BUILT_WITH_INSULATION = 1990
# After 1930, Solid brick walls became less populate and instead, cavity walls became a
# more popular choice
@ -332,15 +333,25 @@ class WallRecommendations(BaseUtility):
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
estimated_cost = cost_per_unit * self.property.insulation_wall_area
recommendations.append(
{
"parts": [get_recommended_part(part, depth, cost_per_unit)],
"parts": [
get_recommended_part(
part=part,
selected_depth=depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=estimated_cost
)
],
"type": "wall_insulation",
"description": "Install " + self._make_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"cost": cost_per_unit * self.property.insulation_wall_area,
"cost": estimated_cost,
}
)
@ -394,10 +405,25 @@ class WallRecommendations(BaseUtility):
if combined_new_u_value - self.U_VALUE_ERROR <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# Here you might want to define a way to add both recommendations together.
# For now, I'm adding them as separate items in the list
ewi_esimtated_cost = ewi_cost_per_unit * self.property.insulation_wall_area
iwi_esimtated_cost = iwi_cost_per_unit * self.property.insulation_wall_area
recommendation = {
"parts": [
get_recommended_part(ewi_part, ewi_depth, ewi_cost_per_unit),
get_recommended_part(iwi_part, iwi_depth, iwi_cost_per_unit)
get_recommended_part(
part=ewi_part,
selected_depth=ewi_depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=ewi_esimtated_cost
),
get_recommended_part(
part=iwi_part,
selected_depth=iwi_depth,
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
selected_total_cost=iwi_esimtated_cost
)
],
"type": "wall_insulation",
"description": (
@ -407,10 +433,7 @@ class WallRecommendations(BaseUtility):
"starting_u_value": u_value,
"new_u_value": combined_new_u_value,
"sap_points": estimate_sap_points(),
"cost": (
ewi_cost_per_unit * self.property.insulation_wall_area + iwi_cost_per_unit *
self.property.insulation_wall_area
),
"cost": ewi_esimtated_cost + iwi_esimtated_cost,
}
self.recommendations.append(recommendation)

View file

@ -110,17 +110,21 @@ def update_lowest_selected_u_value(lowest_selected_u_value, new_u_value):
return lowest_selected_u_value
def get_recommended_part(part, selected_depth, selected_cost):
def get_recommended_part(part, selected_depth, selected_total_cost, quantity, quantity_unit):
"""
Utility function to return a recommended part with the selected depth.
:param part: part to be recommended
:param selected_depth: depth of the selected part
:param selected_cost: cost of the selected depth
:param selected_total_cost: Total cost of the selected part
:param quantity: Quantity of the selected part
:param quantity_unit: Unit of the quantity
:return:
"""
recommended_part = deepcopy(part)
recommended_part["depths"] = [selected_depth]
recommended_part["cost"] = [selected_cost]
recommended_part["estimated_cost"] = selected_total_cost
recommended_part["quantity"] = quantity
recommended_part["quantity_unit"] = quantity_unit
return recommended_part

42
run_lambda_local.sh Executable file
View file

@ -0,0 +1,42 @@
#!/bin/bash
# Set up constants
IMAGE_NAME="fastapi-lambda-image"
TAG="test"
PORT="8000"
RIE_DIR="$HOME/.aws-lambda-rie"
DOCKER_ENTRYPOINT="$RIE_PATH/aws-lambda-rie"
DOCKER_CMD="/usr/local/bin/python -m awslambdaric backend.app.main.handler"
ENV_FILE_PATH="backend/.env"
AWS_CREDENTIALS_PATH="$HOME/.aws/credentials"
# Step 1: Download the AWS Lambda Runtime Interface Emulator if it doesn't exist
if [ ! -f "$RIE_PATH/aws-lambda-rie" ]; then
echo "Downloading AWS Lambda Runtime Interface Emulator..."
mkdir -p $RIE_PATH
curl -Lo $RIE_PATH/aws-lambda-rie https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie-arm64
chmod +x $RIE_PATH/aws-lambda-rie
fi
# Step 2: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME:$TAG -f backend/docker/lambda.Dockerfile .
# Step 3: Run the Docker image with the emulator, .env file, and AWS credentials
echo "Starting the Docker container..."
CONTAINER_ID=$(docker run -d \
--env-file $ENV_FILE_PATH \
-v $AWS_CREDENTIALS_PATH:/root/.aws/credentials \
-v "$RIE_DIR:/aws-lambda" \
-p $PORT:8080 \
--entrypoint "/aws-lambda/aws-lambda-rie" \
$IMAGE_NAME:$TAG $DOCKER_CMD)
# Output information
echo "Docker container is running. Use the following command to send a request:"
echo "curl \"http://localhost:$PORT/2015-03-31/functions/function/invocations\" -d '{}'"
# Optional: If you want the script to stop the Docker container after you press a key, you can uncomment the next lines.
# echo "Press any key to stop the Docker container..."
# read -n 1 -s -r
# docker kill $CONTAINER_ID

View file

@ -2,7 +2,6 @@ service: fastapi-lambda
provider:
name: aws
runtime: python3.10
region: eu-west-2
architecture: x86_64
environment:
@ -17,6 +16,8 @@ provider:
DB_USERNAME: ${env:DB_USERNAME}
DB_PASSWORD: ${env:DB_PASSWORD}
DB_PORT: ${env:DB_PORT}
ECR_URI: ${env:ECR_URI}
GITHUB_SHA: ${env:GITHUB_SHA}
# Give lambda access to read from the bucket
iam:
role:
@ -31,44 +32,10 @@ provider:
- arn:aws:s3:::${env:PLAN_TRIGGER_BUCKET}/*
package:
individually: true
patterns:
- 'backend/**'
- '!backend/tests/**'
- 'recommendations/**'
- '!recommendations/tests/**'
# Exclude all of model_data but then re-include the files we need
- '!model_data/**'
- 'model_data/BaseUtility.py'
- 'model_data/config.py'
- 'model_data/__init__.py'
- 'model_data/EpcClean.py'
- 'model_data/utils.py'
- 'model_data/epc_attributes/**'
- '!infrastructure/**'
- '!data_collection/**'
- '!node_modules/**'
- '!conservation_areas/**'
- '!open_uprn/**'
- '!land_registry/**'
- '!pytest.ini'
- '**/README.md'
plugins:
- serverless-python-requirements
- serverless-domain-manager
custom:
pythonRequirements:
dockerizePip: true
dockerFile: backend/docker/lambda.Dockerfile
useDocker: true
dockerSsh: true
fileName: backend/requirements/base.txt
dockerBuildCmdExtraArgs:
- '--progress=plain'
customDomain:
domainName: api.${self:provider.environment.DOMAIN_NAME}
createRoute53Record: true
@ -76,7 +43,8 @@ custom:
functions:
app:
handler: backend.app.main.handler
image:
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
events:
- http:
path: /{proxy+}