Compare commits

..

No commits in common. "master" and "model#dev#2" have entirely different histories.

93 changed files with 856 additions and 2456 deletions

View file

@ -1,9 +0,0 @@
modules/ml-pipeline/src/pipeline/data/predictions
modules/ml-pipeline/src/pipeline/data/fit_predictions
modules/ml-pipeline/src/pipeline/data/prepared_data
modules/ml-pipeline/src/pipeline/data/model/allmodels
modules/ml-pipeline/src/pipeline/metrics
modules/ml-pipeline/src/pipeline/__pycache__
modules/ml-pipeline/src/pipeline/.dvc
modules/ml-pipeline/src/pipeline/analysis
modules/ml-pipeline/src/pipeline/metrics

View file

@ -1,127 +0,0 @@
name: Sap Change Model Deploy
on:
push:
branches: [ sap-dev, sap-prod, heat-dev, heat-prod, carbon-dev, carbon-prod]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.10.12
- name: Install Serverless and plugins
run: |
npm install -g serverless@^3.38.0
npm install -g serverless-domain-manager@^7.3.8
- name: Install DVC
run: |
pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt
# Set up all of the secrets required for the deployment
- name: set secret prefix which is used across multiple steps
id: secret_prefix
run: |
# Convert branch name to uppercase and replace hyphens with underscores
echo "::set-output name=secret_prefix::$(echo "${{ github.ref_name }}" | tr 'a-z-' 'A-Z_')"
- name: Set domain name
id: set_domain
run: echo "::set-output name=domain::${{ secrets[format('{0}_DOMAIN_NAME', steps.secret_prefix.outputs.secret_prefix)] }}"
- name: Set ECR credentials
id: set_ecr_credentials
run: |
# Fetch the secret using the secret prefix
echo "::set-output name=ecr_uri::${{ secrets[format('{0}_ECR_URI', steps.secret_prefix.outputs.secret_prefix)] }}"
- name: Set S3 buckets
id: set_s3_buckets
run: |
# Fetch the secret using the secret prefix
echo "::set-output name=data_bucket::${{ secrets[format('{0}_DATA_BUCKET', steps.secret_prefix.outputs.secret_prefix)] }}"
echo "::set-output name=predictions_bucket::${{ secrets[format('{0}_PREDICTIONS_BUCKET', steps.secret_prefix.outputs.secret_prefix)] }}"
- name: Set stack_name
id: set_stack_name
run: |
# Take branch prefix and add "model" for stack name
stack_name=$( echo ${{ github.ref_name }} | awk -F"-" '{print $1}' | sed 's/$/model/g')
if [ -z "${stack_name}" ]; then
echo "::set-output name=stack_name::"
else
echo "::set-output name=stack_name::${stack_name}"
fi
- name: Set runtime_environment
id: set_runtime_environment
run: |
# Extract the suffix after the hyphen from the branch name
runtime_environment=$(echo "${{ github.ref_name }}" | awk -F'-' '{print $NF}')
echo "::set-output name=runtime_environment::$runtime_environment"
- name: AWS credentials for dev
if: ${{ steps.set_runtime_environment.outputs.runtime_environment }} == 'dev'
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-2
- name: AWS credentials for prod
if: ${{ steps.set_runtime_environment.outputs.runtime_environment }} == 'prod'
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.PROD_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.PROD_AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-2
- name: DVC Pull
run: |
cd modules/ml-pipeline/src/pipeline
dvc pull -r ${{ steps.set_runtime_environment.outputs.runtime_environment }}
- name: Setup Docker
uses: docker/setup-buildx-action@v1
- name: Login to ECR
run: |
aws ecr get-login-password --region eu-west-2 | docker login --username AWS --password-stdin ${{ steps.set_ecr_credentials.outputs.ecr_uri }}
# Building and pushing Docker image with caching
- name: Build and push Docker image
uses: docker/build-push-action@v3
with:
context: .
file: ./deployment/Dockerfile.prediction.lambda
push: true
tags: ${{ steps.set_ecr_credentials.outputs.ecr_uri }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64
provenance: false
build-args: |
RUNTIME_ENVIRONMENT=${{ steps.set_runtime_environment.outputs.runtime_environment }}
- name: Deploy to AWS Lambda via Serverless
env:
RUNTIME_ENVIRONMENT: ${{ steps.set_runtime_environment.outputs.runtime_environment }}
PREDICTIONS_BUCKET: ${{ steps.set_s3_buckets.outputs.predictions_bucket }}
DATA_BUCKET: ${{ steps.set_s3_buckets.outputs.data_bucket }}
DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }}
ECR_URI: ${{ steps.set_ecr_credentials.outputs.ecr_uri }}
GITHUB_SHA: ${{ github.sha }}
STACK_NAME: ${{ steps.set_stack_name.outputs.stack_name }}
run: |
# Deploy to AWS Lambda via Serverless
cd deployment
sls deploy --config serverless.yml --stage ${{ steps.set_runtime_environment.outputs.runtime_environment }} --verbose

View file

@ -1,47 +0,0 @@
name: Deployment for Dev Model
on:
push:
tags:
- "**model#dev#*"
permissions: write-all
jobs:
Register-Prediction-Image-Dev:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install packages to retrieve artifacts
run: |
pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt
- name: Retrieve artifacts (dvc.lock)
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
cd modules/ml-pipeline/src/pipeline
dvc pull -r dev
- name: Build Prediction docker image (TODO - NEED LAMBDA IMAGE, need to add version from gto registry)
run: |
cd modules/ml-pipeline/src/
REGISTER_MODEL_NAME=$(echo ${{ github.event.pull_request.head.ref }} | awk -F"-" '{print $1}')
docker build . --file Prediction.Dockerfile --tag ${REGISTER_MODEL_NAME}
- name: ECR Login - Dev
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
echo "LOGIN TO ECR"
- name: Push Prediction image to ECR - Dev
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
echo "PUSH TO ECR"

View file

@ -10,9 +10,7 @@ on:
types: types:
- closed - closed
branches: branches:
- "sap-dev" - "master"
- "heat-dev"
- "carbon-dev"
permissions: write-all permissions: write-all
@ -28,7 +26,7 @@ jobs:
- name: Install packages to register model - name: Install packages to register model
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Register Model - name: Register Model
run: | run: |
@ -42,14 +40,7 @@ jobs:
if [ -z "${latest_version}" ]; then if [ -z "${latest_version}" ]; then
increment_version="1.0.0" increment_version="1.0.0"
else else
increment_version=$(echo ${latest_version} | awk 'BEGIN { increment_version=$(echo ${latest_version} | awk -F'.' '{OFS="."; $1+=1; print}')
FS="\\." # Set the field separator to a period
OFS="." # Set the output field separator to a period
}
{
major = $1 + 1 # Increment the major version
print major, "0", "0" # Print the new version
}')
fi fi
new_tag=${REGISTER_MODEL_NAME}@v${increment_version} new_tag=${REGISTER_MODEL_NAME}@v${increment_version}
@ -57,7 +48,7 @@ jobs:
git tag -a ${new_tag} -m "Registering new Major Version" git tag -a ${new_tag} -m "Registering new Major Version"
git push origin ${new_tag} git push origin ${new_tag}
gto show --json > MODEL_REGISTRY.md gto show > MODEL_REGISTRY.md
git add . git add .
git commit -m "Update Registry" git commit -m "Update Registry"
git push git push
@ -73,7 +64,7 @@ jobs:
- name: Install packages to register model - name: Install packages to register model
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Register Model - name: Register Model
run: | run: |
@ -87,14 +78,7 @@ jobs:
if [ -z "${latest_version}" ]; then if [ -z "${latest_version}" ]; then
increment_version="0.1.0" increment_version="0.1.0"
else else
increment_version=$(echo ${latest_version} | awk 'BEGIN { increment_version=$(echo ${latest_version} | awk 'BEGIN{FS=OFS="."} {$2++; print}')
FS="\\." # Set the field separator to a period
OFS="." # Set the output field separator to a period
}
{
minor = $2 + 1 # Increment the minor version
print $1, minor, "0" # Print the new version
}')
fi fi
new_tag=${REGISTER_MODEL_NAME}@v${increment_version} new_tag=${REGISTER_MODEL_NAME}@v${increment_version}
@ -102,7 +86,7 @@ jobs:
git tag -a ${new_tag} -m "Registering new Minor Version" git tag -a ${new_tag} -m "Registering new Minor Version"
git push origin ${new_tag} git push origin ${new_tag}
gto show --json > MODEL_REGISTRY.md gto show > MODEL_REGISTRY.md
git add . git add .
git commit -m "Update Registry" git commit -m "Update Registry"
git push git push
@ -118,7 +102,7 @@ jobs:
- name: Install packages to register model - name: Install packages to register model
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Register Model - name: Register Model
run: | run: |
@ -132,14 +116,7 @@ jobs:
if [ -z "${latest_version}" ]; then if [ -z "${latest_version}" ]; then
increment_version="0.0.1" increment_version="0.0.1"
else else
increment_version=$(echo ${latest_version} | awk 'BEGIN { increment_version=$(echo ${latest_version} | awk 'BEGIN{FS=OFS="."} {$3++; print}')
FS="\\." # Set the field separator to a period
OFS="." # Set the output field separator to a period
}
{
patch = $3 + 1 # Increment the patch version
print $1, $2, patch # Print the new version
}')
fi fi
new_tag=${REGISTER_MODEL_NAME}@v${increment_version} new_tag=${REGISTER_MODEL_NAME}@v${increment_version}
@ -147,7 +124,7 @@ jobs:
git tag -a ${new_tag} -m "Registering new Patch Version" git tag -a ${new_tag} -m "Registering new Patch Version"
git push origin ${new_tag} git push origin ${new_tag}
gto show --json > MODEL_REGISTRY.md gto show > MODEL_REGISTRY.md
git add . git add .
git commit -m "Update Registry" git commit -m "Update Registry"
git push git push
@ -161,14 +138,14 @@ jobs:
- name: Install packages to retrieve artifacts - name: Install packages to retrieve artifacts
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Retrieve artifacts (dvc.lock) - name: Retrieve artifacts (dvc.lock)
env: env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
cd modules/ml-pipeline/src/pipeline cd modules/ml-pipeline/src/pipeline/src
dvc pull -r experiments dvc pull -r experiments
- name: Push artifacts to Dev - name: Push artifacts to Dev
@ -176,7 +153,7 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
cd modules/ml-pipeline/src/pipeline cd modules/ml-pipeline/src/pipeline/src
dvc push -r dev dvc push -r dev
Register-New-Model-Dev: Register-New-Model-Dev:
@ -196,11 +173,12 @@ jobs:
- name: Install packages to register model - name: Install packages to register model
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Register Model - name: Register Model
env: env:
TARGET_BRANCH: ${{ github.base_ref }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
REGISTER_MODEL_NAME=$(echo ${{ github.event.pull_request.head.ref }} | awk -F"-" '{print $1}') REGISTER_MODEL_NAME=$(echo ${{ github.event.pull_request.head.ref }} | awk -F"-" '{print $1}')
@ -209,30 +187,63 @@ jobs:
git config user.name "Github-Bot" git config user.name "Github-Bot"
git config user.email "Github-Bot@no-reply.com" git config user.email "Github-Bot@no-reply.com"
latest_dev_version=$(gto history ${REGISTER_MODEL_NAME} --asc --plain | awk '{print $NF}' | awk '/dev/' | awk 'END {print}') latest_dev_version=$(gto history ${REGISTER_MODEL_NAME} --asc --plain | awk '{print $NF}' | awk '/dev/')
if [ -z "${latest_dev_version}" ]; then if [ -z "${latest_dev_version}" ]; then
increment_version="1" increment_version="1"
else else
increment_version=$(echo ${latest_dev_version} | awk '{print $NF}' | awk -F"#" '{print $3}' | awk '{$1++; print}') increment_version=$(echo ${latest_dev_version} | awk 'END{print}' | awk -F"#" '{print $3}' | awk '{$1++; print}')
fi fi
new_tag=${REGISTER_MODEL_NAME}#dev#${increment_version} new_tag=${REGISTER_MODEL_NAME}#dev#${increment_version}
latest_version=$(gto show ${REGISTER_MODEL_NAME}@latest --ref | awk -F"@" '{print $2}') latest_version=$(gto show model@latest --ref | awk -F"@" '{print $2}')
echo ${new_tag} echo ${new_tag}
commit_hash=$(gto history ${REGISTER_MODEL_NAME} --asc --plain | awk "/${latest_version}/" | awk '{print $(NF-1)}') git pull #Get new model registry md file changes
git checkout ${commit_hash} git tag -a ${new_tag} -m "Assigning stage dev to artifact temp version ${latest_version}"
# git pull #Get new model registry md file changes
git tag -a ${new_tag} -m "Assigning stage dev to artifact ${REGISTER_MODEL_NAME} version ${latest_version}"
git push origin ${new_tag} git push origin ${new_tag}
git checkout ${TARGET_BRANCH} gto show > MODEL_REGISTRY.md
git fetch --all
git pull
gto show --json > MODEL_REGISTRY.md
git add . git add .
git commit -m "Update Registry" git commit -m "Update Registry"
git push origin ${TARGET_BRANCH} git push origin master
Register-Prediction-Image-Dev:
needs: [Promote-Artefacts-To-Dev, Register-New-Model-Dev]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install packages to retrieve artifacts
run: |
pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Retrieve artifacts (dvc.lock)
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
cd modules/ml-pipeline/src/pipeline/src
dvc pull -r dev
- name: Build Prediction docker image (TODO - NEED LAMBDA IMAGE, need to add version from gto registry)
run: |
cd modules/ml-pipeline/src/pipeline/
REGISTER_MODEL_NAME=$(echo ${{ github.event.pull_request.head.ref }} | awk -F"-" '{print $1}')
docker build . --file Prediction.Dockerfile --tag ${REGISTER_MODEL_NAME}
- name: ECR Login - Dev
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
echo "LOGIN TO ECR"
- name: Push Prediction image to ECR - Dev
env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: |
echo "PUSH TO ECR"

View file

@ -5,7 +5,7 @@ on:
# branches: # branches:
# - "model-**" # - "model-**"
pull_request: pull_request:
branches: ["sap-dev", "heat-dev", "carbon-dev"] branches: [ "master" ]
label: label:
types: ["created", "edited"] types: ["created", "edited"]
@ -44,19 +44,19 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Retrieve artifacts (dvc.lock) - name: Retrieve artifacts (dvc.lock)
env: env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
cd modules/ml-pipeline/src/pipeline cd modules/ml-pipeline/src/pipeline/src
dvc pull -r experiments dvc pull -r experiments
- name: Build Prediction docker Image - name: Build Prediction docker Image
run: | run: |
cd modules/ml-pipeline/src/ cd modules/ml-pipeline/src/pipeline/
docker build . --file Prediction.Dockerfile --tag prediction_test docker build . --file Prediction.Dockerfile --tag prediction_test
- name: Run Prediction docker container - name: Run Prediction docker container
@ -72,14 +72,14 @@ jobs:
- name: Install packages to retrieve artifacts - name: Install packages to retrieve artifacts
run: | run: |
pip install --upgrade pip pip install --upgrade pip
pip install -r modules/ml-pipeline/src/pipeline/requirements/version_control/requirements.txt pip install -r modules/ml-pipeline/src/pipeline/src/requirements/version_control/requirements.txt
- name: Retrieve artifacts (dvc.lock) - name: Retrieve artifacts (dvc.lock)
env: env:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
run: | run: |
cd modules/ml-pipeline/src/pipeline cd modules/ml-pipeline/src/pipeline/src
dvc pull -r experiments dvc pull -r experiments
- uses: actions/setup-python@v4 - uses: actions/setup-python@v4
@ -89,24 +89,13 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }} AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY }}
REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }} REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }}
TARGET_BRANCH: ${{ github.base_ref }}
run: | run: |
cd modules/ml-pipeline/src/pipeline cd modules/ml-pipeline/src/pipeline/src
echo "## Model metrics" > report.md echo "## Model metrics" > report.md
# Compare metrics to master # Compare metrics to master
git fetch --depth=1 origin ${TARGET_BRANCH}:${TARGET_BRANCH} git fetch --depth=1 origin master:master
dvc metrics diff --md --all ${TARGET_BRANCH} >> report.md dvc metrics diff --md --all master >> report.md
echo "## Scenario comparison" >> report.md
cat metrics/scenario_table.md >> report.md
echo "" >> report.md
echo "## Scenario metrics" >> report.md
cat metrics/scenario_metrics.md >> report.md
cml comment create report.md cml comment create report.md

View file

@ -10,10 +10,3 @@ repos:
rev: 22.10.0 rev: 22.10.0
hooks: hooks:
- id: black - id: black
- repo: local
hooks:
- id: dvc-push-experiment
name: DVC - Push to experiment to remote location (experiments)
entry: bash -c 'cd modules/ml-pipeline/src/pipeline && dvc push -r experiments || echo "Up to date!"'
language: system
verbose: true

View file

@ -1,34 +1,7 @@
{ ╒════════╤══════════╤═════════╕
"model": { │ name │ latest │ #dev
"version": "v12.10.12", ╞════════╪══════════╪═════════╡
"stage": { │ bob │ v1.0.0 │ - │
"dev": "v11.10.12" │ model │ v8.2.5 │ 67cd1ec │
}, │ temp │ v0.0.1 │ v0.0.1 │
"registered": true, ╘════════╧══════════╧═════════╛
"active": true
},
"sap": {
"version": "v0.14.0",
"stage": {
"dev": "v0.14.0"
},
"registered": true,
"active": true
},
"heat": {
"version": "v0.5.0",
"stage": {
"dev": "v0.5.0"
},
"registered": true,
"active": true
},
"carbon": {
"version": "v0.5.0",
"stage": {
"dev": "v0.5.0"
},
"registered": true,
"active": true
}
}

View file

@ -3,83 +3,9 @@
Creating a ML-toolkit that can be reused: Creating a ML-toolkit that can be reused:
- ML pipeline: - ML pipeline:
- A generic pipeline that has data version control, experiment - A dummy pipeline that has data version control, experiment
tracking and a model registry tracking and a model registry
- ML monitoring: - ML monitoring:
- A bolt-on service that can implement model monitoring - A bolt-on service that can implement model monitoring
There are multiple protected branches which adapt the generic pipeline to produce different models:
- sap-{dev/staging/prod}-**
- heat-{dev/staging/prod}-**
- carbon-{dev/staging/prod}-**
These branches will differ by the configuration files that define the data used and the outputs of the ML-pipeline
- There can be different additional logic for each branch but the pipeline will be the same.
# Deployment
Scripts associated to deployment can be found in the deployment/ folder.
Deployment is automated via Github Actions, where a deployment is triggered by a push to one of the
protected branch, with one of dev or prod as the suffix, describing the target environment.
The github actions file will build and push a docker image to ECR and then deploy a lambda
which produces predictions for the relevant model.
In order for this to be set up, some key environment variables needs to be inserted into Github
secrets. Each different model and protected branch has its own set of secrets which allows for flexibility
between different pipelines.
For example, for the branch sap-dev, the prefix=SAP_DEV, and the following secrets are:
- {prefix}_ECR_URI, which is the URI of the ECR repository to push to. For example, for the
sap change model this is the lambda-sap-prediction-dev repository.
- {prefix}_DOMAIN_NAME, is the custom domain name. This is likely going to be the same across the different
models, but is still included in the secrets for flexibility.
- {prefix}_DATA_BUCKET, is the name of the s3 data bucket where data to be scored by the model is stored
- {prefix}_MODEL_BUCKET, is the name of the s3 bucket where the model is stored
- {prefix}_PREDICTIONS_BUCKET, is the name of the s3 bucket where the predictions are stored
# Building and Testing the Prediction Lambda Function Locally
TODO: Generalise these instructions for the various different pipelines
This guide outlines the steps to build and test the Lambda function locally using Docker. These instructions assume you're working with a machine that has Docker installed.
### Prerequisites
Docker: Make sure Docker is installed and running on your machine.
AWS Credentials: Ensure you have AWS credentials set up on your local machine, typically stored
in ~/.aws/credentials.
Root Directory: All commands should be run from the root directory of the repository.
Step-by-Step Guide
1. Building the Docker Image
First, navigate to the root directory of the repository. Open a terminal and execute the following
2. command to build the Docker image:
```bash
docker build -t sap -f deployment/Dockerfile.prediction.lambda .
```
This will build a Docker image tagged as sap_change using the Dockerfile.prediction.lambda located
in the deployment directory.
2. Running the Docker Image
Once the image is built, you can run it using the following command:
```bash
docker run -p 9000:8080 -v ~/.aws/credentials:/root/.aws/credentials:ro -e RUNTIME_ENVIRONMENT=dev -e PREDICTIONS_BUCKET=retrofit-sap-predictions-dev sap
```
This command does the following:
Maps port 9000 on your local machine to port 8080 on the Docker container.
Mounts your AWS credentials into the Docker container in read-only mode.
Sets the RUNTIME_ENVIRONMENT variable to dev.
3. Testing the Lambda Function
To test the Lambda function, use the following curl command:
```json
curl -XPOST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{"body": "{\"file_location\": \"s3://retrofit-data-dev/sap_change_model/one_sample_test_dataset.parquet\", \"property_id\": 1, \"portfolio_id\": 4, \"created_at\": \"now\"}"}'
```
This will send a POST request to the running Lambda function and pass in the required data as JSON.

View file

@ -1,9 +0,0 @@
modules/ml-pipeline/src/pipeline/data/predictions
modules/ml-pipeline/src/pipeline/data/fit_predictions
modules/ml-pipeline/src/pipeline/data/prepared_data
modules/ml-pipeline/src/pipeline/data/model/allmodels
modules/ml-pipeline/src/pipeline/metrics
modules/ml-pipeline/src/__pycache__
modules/ml-pipeline/src/.dvc
modules/ml-pipeline/src/analysis
modules/ml-pipeline/src/metrics

View file

@ -1,25 +0,0 @@
FROM public.ecr.aws/lambda/python:3.10
# Set the working directory
WORKDIR ${LAMBDA_TASK_ROOT}
ENV PYTHONPATH "${PYTHONPATH}:${LAMBDA_TASK_ROOT}"
# Environment variables
ARG RUNTIME_ENVIRONMENT
ENV RUNTIME_ENVIRONMENT=${RUNTIME_ENVIRONMENT}
# Install necessary build tools - required to test locally
RUN yum install -y gcc python3-devel gcc-c++
# Install python packages
COPY modules/ml-pipeline/src/pipeline/requirements/predictions/requirements.txt ./requirements.txt
RUN pip install --no-cache-dir -r ./requirements.txt
# Copy the project code
COPY modules/ml-pipeline/src/pipeline ./pipeline
# Copy the handler
COPY deployment/handlers/prediction_app.py ./pipeline/prediction_app.py
WORKDIR ${LAMBDA_TASK_ROOT}/pipeline
CMD [ "prediction_app.handler" ]

View file

@ -1,123 +0,0 @@
"""
This script is the handler for the lambda prediction function, responsible
for producting predictions for a model
"""
import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
from generate_predictions import generate_predictions
from core.MLModels import model_factory
from config import settings
from core.DataClient import dataclient_factory
logger = logging.getLogger()
logger.setLevel(logging.INFO)
PREDICTIONS_BUCKET = os.getenv("PREDICTIONS_BUCKET", None)
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client("s3")
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
logger.info("received event: " + str(event))
try:
body = (
json.loads(event["body"])
if not isinstance(event["body"], dict)
else event["body"]
)
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
# TODO: Implement the loading of the model and prediction
storage_filepath = f"s3://{PREDICTIONS_BUCKET}/{portfolio_id}/{property_id}/{created_at}.parquet"
logger.info(f"--- Initiate MLModel ---")
build_model_params = settings.build_model
client_params = settings.client
feature_process_params = settings.feature_processor
generate_predictions_params = settings.generate_predictions
model = model_factory(build_model_params["model_type"])
logger.info(f"--- Initiate Input DataClient ---")
input_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
logger.info(f"--- Initiate Output DataClient ---")
output_dataclient = dataclient_factory(
dataclient_type="aws-s3",
dataclient_config=client_params["aws-s3"],
)
generate_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=build_model_params["model_save_filepath"],
test_data_filepath=body["file_location"],
predictions_output_filepath=storage_filepath,
predictions_column_name=generate_predictions_params[
"predictions_column_name"
],
identifier_column=generate_predictions_params["identifier_column"],
)
return {
"statusCode": 200,
"body": json.dumps(
{
"message": "Successfully processed input",
"storage_filepath": storage_filepath,
}
),
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({"message": "Prediction failed", "error": str(e)}),
}

View file

@ -1,53 +0,0 @@
service: ${env:STACK_NAME}
provider:
name: aws
region: eu-west-2
architecture: x86_64
environment:
RUNTIME_ENVIRONMENT: ${env:RUNTIME_ENVIRONMENT}
PREDICTIONS_BUCKET: ${env:PREDICTIONS_BUCKET}
DATA_BUCKET: ${env:DATA_BUCKET}
DOMAIN_NAME: ${env:DOMAIN_NAME}
ECR_URI: ${env:ECR_URI}
GITHUB_SHA: ${env:GITHUB_SHA}
iam:
role:
name: ${env:STACK_NAME}_s3_access
statements:
# Allow reading from the DATA_BUCKET
- Effect: Allow
Action:
- s3:*
Resource:
- arn:aws:s3:::${env:DATA_BUCKET}
- arn:aws:s3:::${env:DATA_BUCKET}/*
# Allow reading and writing to PREDICTIONS_BUCKET
- Effect: Allow
Action:
- s3:*
Resource:
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}/*
plugins:
- serverless-domain-manager
custom:
customDomain:
domainName: api.${self:provider.environment.DOMAIN_NAME}
basePath: ${env:STACK_NAME}
createRoute53Record: true
certificateArn: ${ssm:/ssl_certificate_arn}
functions:
sap_prediction_lambda:
image:
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
events:
- http:
path: /predict
method: POST
timeout: 120 # Set max run time to 2 minutes - we shouldn't need this much time so this can be reviewed

View file

@ -0,0 +1,2 @@
['remote "myremote"']
url = /tmp/dvcstore

View file

@ -1,6 +1,4 @@
.dev_env/ .dev_env/
.dev_env_pipeline/
__pycache__/ __pycache__/
.DS_Store .DS_Store
.vscode/ .vscode/
data/

2
modules/ml-pipeline/.gto Normal file
View file

@ -0,0 +1,2 @@
# .gto config file
stages: [dev, stage, prod] # list of allowed Stages

View file

@ -20,6 +20,6 @@ repos:
hooks: hooks:
- id: dvc-push-experiment - id: dvc-push-experiment
name: DVC - Push to experiment to remote location (experiments) name: DVC - Push to experiment to remote location (experiments)
entry: bash -c 'cd modules/ml-pipeline/src/pipeline && dvc push -r experiments || echo "Up to date!"' entry: bash -c 'cd modules/ml-pipeline/src/pipeline/src && dvc push -r experiments || echo "Up to date!"'
language: system language: system
verbose: true verbose: true

View file

@ -1,25 +1,9 @@
export PYENV_ROOT=$(HOME)/.pyenv export PYENV_ROOT=$(HOME)/.pyenv
export PATH := $(PYENV_ROOT)/bin:$(PATH) export PATH := $(PYENV_ROOT)/bin:$(PATH)
PYTHON_VERSION ?= 3.10.12 PYTHON_VERSION ?= 3.10.12
CONDA_ENV=dev_env_pipeline
.PHONY: init .PHONY: init
init: dev-conda init: dev-pyenv
.PHONY: dev-conda
dev-conda:
# conda deactivate || echo "Not in conda environment"
# conda remove --name ${CONDA_ENV} --all -y || echo "No environment created previously"
conda create --name ${CONDA_ENV} python=$(PYTHON_VERSION) -y
conda init bash
conda run -v -n ${CONDA_ENV} pip install --upgrade pip
conda run -v -n ${CONDA_ENV} pip install -r src/pipeline/requirements/training/requirements-dev.txt
conda run -v -n ${CONDA_ENV} pip install -r src/pipeline/requirements/version_control/requirements.txt
conda run -v -n ${CONDA_ENV} pre-commit install
conda run -v -n ${CONDA_ENV} pip install ipykernel
echo "TO ACTIVATE ENVIRONMENT, USE THE FOLLOWING COMMAND"
echo "conda activate ${CONDA_ENV}"
.PHONY: dev-pyenv .PHONY: dev-pyenv
dev-pyenv: dev-pyenv:
@ -27,7 +11,7 @@ dev-pyenv:
pyenv install ${PYTHON_VERSION} || echo "Python version already installed" pyenv install ${PYTHON_VERSION} || echo "Python version already installed"
pyenv global ${PYTHON_VERSION} pyenv global ${PYTHON_VERSION}
python3 -m venv .dev_env_pipeline python3 -m venv .dev_env_pipeline
. .dev_env_pipeline/bin/activate && pip install --upgrade pip && pip install -r src/pipeline/requirements/training/requirements-dev.txt && pip install -r src/pipeline/requirements/version_control/requirements.txt && pre-commit install . .dev_env_pipeline/bin/activate && pip install --upgrade pip && pip install -r src/pipeline/src/requirements/training/requirements-dev.txt && pip install -r src/pipeline/src/requirements/version_control/requirements.txt && pre-commit install
echo "TO ACTIVATE ENVIRONMENT, USE THE FOLLOWING COMMAND" echo "TO ACTIVATE ENVIRONMENT, USE THE FOLLOWING COMMAND"
echo "source .dev_env_pipeline/bin/activate" echo "source .dev_env_pipeline/bin/activate"

View file

@ -1,30 +1,16 @@
# ML-pipeline # ML-pipeline
This is a generic ML-pipeline, consisting of: This is a dummy ML-pipeline, consisting of:
- dvc tracking for version control (data and models) - dvc tracking for version control (data and models)
- gto for model registry - gto for model registry
- docs, created via sphinx (in pre-commit hooks) - docs, created via sphinx (in pre-commit hooks)
- tests for unit, integration and end to end testing - tests for unit, integration and end to end testing
Within `src` folder, the structure is as follows: Within `src` folder, the structure is as follows:
- `pipeline` folder, which contains all the codebase for the generic pipeline - multiple pipelines can be defined
- The pipeline can track multiple models through dvc and gto model registry - i.e. for a product, we might require multuple pipelines do deliver a result
- Deployment files: - i.e. multiple models
- Prediction.Dockerfile - code to create the prediction deployment image - these models can be all tracked within the same gto model registry
- Training.Dockerfil - code to create the training image (i.e. for remote training on EC2/ Fargate)
- Docker development environment:
- If you wish to develop within a docker.
# How to develop using this pipeline:
Run `make init`, which will:
- Download pyenv (Python version management)
- Download Python 3.X.X as defined in the `make` file - current 3.10.12
- Create a virtual environment with this version of python
- Install packages in the training and version control directories in the pipeline folder (dev version if applicable)
- Install pre-commit to enable pre-commit hooks
To use the environment, run `source .dev_env_pipeline/bin/activate`.
To enable the virtual envrionemnt created in vscode: To enable the virtual envrionemnt created in vscode:
- Open settings - Open settings

View file

@ -1,8 +0,0 @@
pipeline/data/predictions
pipeline/data/fit_predictions
pipeline/data/prepared_data/train.parquet
pipeline/data/fit_predictions
pipeline/data/model/allmodels
pipeline/metrics
pipeline/.dvc
pipeline/analysis

View file

@ -1,3 +0,0 @@
# The generic reproducible ML-pipeline
Pipeline required to build a model to produce an output, that gets hashed via DVC

View file

@ -1,3 +0,0 @@
# Ignore dynaconf secret files
.secrets.*

View file

@ -1,149 +0,0 @@
"""
Second Pipieline step:
Once we have the features, we build a model
"""
import os
import yaml
import pandas as pd
from typing import Union, List
from pathlib import Path
from core.Logger import logger
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from configs.post_prediction_logic import post_prediction_logic
from config import settings
logger.info(f"--- Initiate Parameters ---")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
prepare_data_params = settings.prepare_data
build_model_params = settings.build_model
feature_process_params = settings.feature_processor
generate_metrics_params = settings.generate_metrics
generate_predictions_params = settings.generate_predictions
model_type = build_model_params["model_type"]
target = feature_process_params["feature_processor_config"]["target"]
fit_predictions_filepath = build_model_params["fit_predictions_filepath"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
identifier_columns = feature_process_params["feature_processor_config"][
"identifier_columns"
]
model_save_location = build_model_params["model_save_filepath"]
model_hyperparameters = build_model_params[model_type]
train_filepath = prepare_data_params["output_train_filepath"]
test_filepath = prepare_data_params["output_test_filepath"]
fit_metrics_filepath = build_model_params["fit_metrics_filepath"]
logger.info(f"--- Initiate DataClient ---")
# Output of previous prepare data step, will be where the data is
dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"])
logger.info(f"--- Initiate MLModel ---")
model = model_factory(model_type)
logger.info(f"--- Initiate Metrics ---")
metrics = metrics_factory(generate_metrics_params["metrics_type"])
def build_model(
dataclient: DataClient,
model: MLModel,
metrics: MLMetrics,
target: str,
identifier_columns: List[str],
model_save_location: str,
model_hyperparameters: dict,
fit_predictions_filepath: str,
predictions_column_name: str,
fit_metrics_filepath: str,
train_filepath: Union[str, None] = None,
test_filepath: Union[str, None] = None,
train_data: Union[pd.DataFrame, None] = None,
test_data: Union[pd.DataFrame, None] = None,
pipeline_mode: bool = False,
):
logger.info("--- Loading Data for build process ---")
if train_data is None:
if train_filepath is None:
raise ValueError(f"Need {train_filepath} if no data supplied")
train_data = dataclient.load_data(location=train_filepath, load_config=None)
if test_data is None:
if test_filepath is None:
raise ValueError(f"Need {test_filepath} if no data supplied")
test_data = dataclient.load_data(location=test_filepath, load_config=None)
logger.info("--- Training model ---")
model.train_model(
data=train_data.drop(columns=identifier_columns),
target=target,
model_hyperparameters=model_hyperparameters,
)
logger.info("--- Generating fit predictions ---")
fit_predictions = model.predict(
data=train_data, post_prediction_logic=post_prediction_logic
)
logger.info("--- Saving fit predictions ---")
predictions_df = pd.DataFrame(fit_predictions)
predictions_df.columns = [predictions_column_name]
dataclient.save_data(
obj=predictions_df, location=fit_predictions_filepath, save_config=None
)
logger.info("--- Generating fit metrics ---")
metrics_output = metrics.generate_metrics(
target=train_data[target],
predictions=pd.Series(fit_predictions),
)
logger.info("--- Saving model ---")
model.save_model(path=Path(model_save_location))
logger.info("--- Saving fit metrics ---")
dataclient.save_data(
obj=metrics_output, location=fit_metrics_filepath, save_config=None
)
if __name__ == "__main__":
logger.info(f"--- {__file__} - Start! ---")
logger.info(f"--- Build Model Stage ---")
build_model(
dataclient=dataclient,
model=model,
metrics=metrics,
target=target,
identifier_columns=identifier_columns,
model_save_location=model_save_location,
model_hyperparameters=model_hyperparameters,
train_filepath=train_filepath,
test_filepath=test_filepath,
fit_metrics_filepath=fit_metrics_filepath,
fit_predictions_filepath=fit_predictions_filepath,
predictions_column_name=predictions_column_name,
)
logger.info(f"--- {__file__} - Complete! ---")

View file

@ -1,71 +0,0 @@
"""
Third part of the pipeline:
After the model is built, we can evaluate its performance
"""
import os
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.Logger import logger
from config import settings
from generate_predictions import generate_predictions
logger.info(f"--- Initiate Parameters ---")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_params = settings.client
prepare_data_params = settings.prepare_data
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
feature_process_params = settings.feature_processor
input_dataclient_type = generate_predictions_params["input_dataclient_type"]
output_dataclient_type = generate_predictions_params["output_dataclient_type"]
test_data_filepath = generate_predictions_params["test_data_filepath"]
test_data_filepath = os.environ.get("PREDICTION_FILE", test_data_filepath)
target = feature_process_params["feature_processor_config"]["target"]
model_filepath = build_model_params["model_save_filepath"]
predictions_output_filepath = generate_predictions_params["predictions_output_filepath"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
logger.info(f"--- Initiate MLModel ---")
model = model_factory(build_model_params["model_type"])
logger.info(f"--- Initiate DataClient ---")
# We may have different locations of loading hence why we use one specified in generate_predictions.yaml
# I.e. for metric runs, this will be a local data client
# For predictions, we will want a cloud data client
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
if __name__ == "__main__":
logger.info(f"--- {__file__} - Start! ---")
logger.info(f"--- Generate Predictions Stage---")
generate_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
target=target,
model_filepath=model_filepath,
test_data_filepath=test_data_filepath,
predictions_output_filepath=predictions_output_filepath,
predictions_column_name=predictions_column_name,
)
logger.info(f"--- {__file__} - Complete! ---")

View file

@ -1,113 +0,0 @@
"""
Third part of the pipeline:
After the model is built, we can evaluate its performance
"""
import os
import yaml
import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.Logger import logger
from config import settings
logger.info(f"--- Initiate Parameters ---")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_params = settings.client
prepare_data_params = settings.prepare_data
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
generate_metrics_params = settings.generate_metrics
feature_process_params = settings.feature_processor
target = feature_process_params["feature_processor_config"]["target"]
test_data_filepath = generate_predictions_params["test_data_filepath"]
predictions_output_filepath = generate_predictions_params["predictions_output_filepath"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
metrics_output_filepath = generate_metrics_params["metrics_output_filepath"]
logger.info(f"--- Initiate MLModel ---")
model = model_factory(build_model_params["model_type"])
logger.info(f"--- Initiate DataClient ---")
# Use data client for input and output, as we use dvc to cache later to the cloud
dataclient_type = generate_metrics_params["dataclient_type"]
dataclient = dataclient_factory(
dataclient_type=dataclient_type,
dataclient_config=client_params[dataclient_type],
)
logger.info(f"--- Initiate MLMetrics ---")
metrics = metrics_factory(generate_metrics_params["metrics_type"])
def generate_metrics(
input_dataclient: DataClient,
output_dataclient: DataClient,
model: MLModel,
metrics: MLMetrics,
target: str,
test_data_filepath: str,
predictions_output_filepath: str,
predictions_column_name: str,
metrics_output_filepath: str,
):
"""
For a given model, we generate prediction and evaluate this against the true target
"""
logger.info("--- Loading test data ---")
test_data = input_dataclient.load_data(
location=test_data_filepath, load_config=None
)
logger.info("--- Loading predictions ---")
predictions = input_dataclient.load_data(
location=predictions_output_filepath, load_config=None
)
logger.info("--- Generating metrics ---")
metrics_output = metrics.generate_metrics(
target=test_data[target],
predictions=pd.Series(predictions[predictions_column_name]),
)
logger.info("--- Saving metrics ---")
output_dataclient.save_data(
obj=metrics_output, location=metrics_output_filepath, save_config=None
)
if __name__ == "__main__":
logger.info(f"--- {__file__} - Start! ---")
logger.info(f"--- Generate Metrics Stage---")
generate_metrics(
input_dataclient=dataclient,
output_dataclient=dataclient,
model=model,
metrics=metrics,
target=target,
test_data_filepath=test_data_filepath,
predictions_output_filepath=predictions_output_filepath,
predictions_column_name=predictions_column_name,
metrics_output_filepath=metrics_output_filepath,
)
logger.info(f"--- {__file__} - Complete! ---")

View file

@ -1,162 +0,0 @@
"""
Fourth part of the pipeline:
After the model is built and metrics are generated,
we want to test this model against known scenarios
"""
import os
import pandas as pd
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceMetrics import MLMetrics
from configs.post_prediction_logic import post_prediction_logic
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.Logger import logger
from config import settings
logger.info(f"--- Initiate Parameters ---")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_params = settings.client
prepare_data_params = settings.prepare_data
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
generate_metrics_params = settings.generate_metrics
feature_process_params = settings.feature_processor
scenarios_params = settings.scenarios
model_filepath = build_model_params["model_save_filepath"]
target = feature_process_params["feature_processor_config"]["target"]
scenario_data_filepaths = scenarios_params["scenario_data_filepaths"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
comparison_output_filepath = scenarios_params["comparison_output_filepath"]
metrics_output_filepath = scenarios_params["metrics_output_filepath"]
logger.info(f"--- Initiate MLModel ---")
model = model_factory(build_model_params["model_type"])
logger.info(f"--- Initiate DataClient ---")
# Use data client for input and output, as we use dvc to cache later to the cloud
input_dataclient_type = scenarios_params["input_dataclient_type"]
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient_type = scenarios_params["output_dataclient_type"]
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
logger.info(f"--- Initiate MLMetrics ---")
metrics = metrics_factory(generate_metrics_params["metrics_type"])
def generate_scenario_predictions(
input_dataclient: DataClient,
output_dataclient: DataClient,
model: MLModel,
metrics: MLMetrics,
model_filepath: str,
scenario_data_filepaths: list,
predictions_column_name: str,
comparison_output_filepath: str,
metrics_output_filepath: str,
):
"""
Given the new model, we generate prediction for expected scenarios
"""
logger.info("--- Loading Scenario Data ---")
scenario_data = pd.DataFrame()
# If we have no scenario data, we can save empty dataframes
if scenario_data_filepaths is None:
logger.info("No scenario data filepaths provided")
output_dataclient.save_data(
obj=scenario_data, location=comparison_output_filepath, save_config=None
)
output_dataclient.save_data(
obj=scenario_data, location=metrics_output_filepath, save_config=None
)
return
# Can have multiple scenario data files
for scenario_data_filepath in scenario_data_filepaths:
scenario_data = pd.concat(
[
scenario_data,
input_dataclient.load_data(scenario_data_filepath, load_config=None),
]
)
logger.info("--- Loading Model ---")
model.load_model(model_filepath)
logger.info("--- Generating Predictions ---")
predictions = model.predict(
data=scenario_data, post_prediction_logic=post_prediction_logic
)
logger.info("--- Generate Scenario Predicted Impact ---")
predictions_df = pd.DataFrame(predictions)
predictions_df.columns = [predictions_column_name]
scenario_data = pd.concat([scenario_data, predictions_df], axis=1)
scenario_data["predicted_impact"] = abs(
scenario_data[predictions_column_name] - scenario_data["sap_starting"]
)
logger.info("--- Generate Metrics ---")
metrics_dict = metrics.generate_metrics(
scenario_data["impact"], scenario_data["predicted_impact"]
)
metrics_df = pd.DataFrame(metrics_dict, index=[0]).T.reset_index()
metrics_df.columns = ["metric", "value"]
logger.info("--- Save prediction into metrics ---")
output_df = scenario_data[["uprn", "id", "impact", "predicted_impact"]]
output_dataclient.save_data(
obj=output_df, location=comparison_output_filepath, save_config=None
)
output_dataclient.save_data(
obj=metrics_df, location=metrics_output_filepath, save_config=None
)
if __name__ == "__main__":
logger.info(f"--- {__file__} - Start! ---")
logger.info(f"--- Generate Scenario Predictions ---")
generate_scenario_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
metrics=metrics,
model_filepath=model_filepath,
scenario_data_filepaths=scenario_data_filepaths,
predictions_column_name=predictions_column_name,
comparison_output_filepath=comparison_output_filepath,
metrics_output_filepath=metrics_output_filepath,
)
logger.info(f"--- {__file__} - Complete! ---")

View file

@ -1,16 +1,14 @@
# Dockerfile that can be used to test loading a model to generate a prediction (part of CI/CD flow) # Dockerfile that can be used to test loading a model to generate a prediction (part of CI/CD flow)
FROM python:3.10.12-slim FROM python:3.10.12-slim
RUN apt-get update && apt-get install -y libgomp1 gcc python3-dev COPY src/requirements/predictions/requirements.txt requirements.txt
COPY pipeline/requirements/predictions/requirements.txt requirements.txt
RUN pip install --upgrade pip RUN pip install --upgrade pip
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
# Assuming in the CI/CD step, there will be a dvc pull step to get data and model, so will just need to run a single script # Assuming in the CI/CD step, there will be a dvc pull step to get data and model, so will just need to run a single script
COPY pipeline/ /home/pipeline/ COPY src/ /home/src/
WORKDIR /home/pipeline/ WORKDIR /home/src/
CMD [ "python", "3_generate_predictions.py"] CMD [ "python", "generate_predictions.py"]

View file

@ -1,40 +1,3 @@
# Training # Pipeline 1
This folder contains the code base for training experimentation. Pipeline required to build a model to produce an output
To understand the pipeline, run `dvc dag`
There are 4 main steps:
- Preparing data
- This is loading data (locally or from s3)
- Splitting the data into train and validation
- Creating additional features (if needed)
- **Data is cached**
- This will be down to the dvc remote location
- Build model
- For the prepared data, we build a model using our configurations
- Model is saved (locally or s3)
- **Model and fit metrics are cached**
- This will be down to the dvc remote location
- Generate Predictions
- For the given model, we generate predictions on validation test data
- **Predictions are cached**
- This will be down to the dvc remote location
- Generate Metrics
- For the given model, we generate metrics on validation data/test data
- **Metrics are cached**
- This will be down to the dvc remote location
Workflow:
- Use `dvc metrics show` to view current metrics score
- Adjust parameters/ codebase
- When happy with changes, use `dvc exp run` to trigger an experiment
- Due to cache, only need stages are re-run
- Use `dvc metrics diff` to check the change in metrics
- Use `dvc exp show` to view all experiments
- NOTE: the last experiment will always be applied to the workspace!
- After running experiments, you can apply the the best model to workspace using `dvc exp apply [EXPERIMENT_NAME]`
- This experiment will have the corresponding .dvc files for the hashed model and data
- Use version control as normal
- git add, git commit etc
- To revert change, use `git checkout {COMMIT_HASH}`, followed by `git switch -c {NEW_BRANCH_NAME}`

View file

@ -1,15 +0,0 @@
from dynaconf import Dynaconf
settings = Dynaconf(
environments=True,
envvar_prefix="DYNACONF",
settings_files=[
"./configs/settings.yaml",
"./configs/build_model.yaml",
"./configs/analysis.yaml",
"./configs/scenarios.yaml",
],
)
# `envvar_prefix` = export envvars with `export DYNACONF_FOO=bar`.
# `settings_files` = Load these files in the order.

View file

@ -1,16 +0,0 @@
default:
model_analysis:
dataclient_type: local
feature_importance_filepath: ./analysis/feature_importance.parquet
permutation_subsample_amount: 1000
loss_fns: "mean_absolute_percentage_error"
feature_importance_column: importance
n_repeats: 5
figwidth: 7
figheight: 6
prediction_analysis:
dataclient_type: local
nshap_samples: 100 # how many samples to use to approximate each Shapely value, larger values will be slower
n_val: 30 # how many datapoints from validation data should we interpret predictions for, larger values will be slower
row_index: [20695, 50243, 7653] # index of an example datapoint

View file

@ -1,22 +0,0 @@
default:
build_model:
model_type: AutogluonAutoML
model_save_filepath: ./data/model/optimised/
fit_metrics_filepath: ./metrics/fit_metrics.json
fit_predictions_filepath: ./data/fit_predictions/predictions.parquet
SKLearnLinearRegression: null
SKLearnSVMRegression:
kernel: "linear"
AutogluonAutoML:
output_filepath: ./data/model/allmodels/
problem_type: regression
eval_metric: mean_squared_error #mean_absolute_error
time_limit: 1800
presets: medium_quality
excluded_model_types: ['RF', 'CAT', 'NN_TORCH', 'KNN', 'XT']
infer_limit: 0.05
infer_limit_batch_size: 10000
ag_args_ensemble: {'num_folds_parallel': 2}

View file

@ -1,75 +0,0 @@
"""
During the feature processor step, we can apply additional business logic and feature generation by defining them here
"""
"""
Business Logic dict + functions
"""
def remove_starting_columns(df):
keep_column_index = [
False if col_name.endswith("_starting") else True
for col_name in list(df.columns)
]
keep_columns = df.columns[keep_column_index].to_list()
keep_columns.append("sap_starting")
df = df[keep_columns]
return df
def remove_floor_height_ending(df):
# df.describe(percentiles=[0.005,0.99])['FLOOR_HEIGHT_ENDING']
# shows bottom 0.5 percentile is 1.665
# So keep anything above this
df = df[df["floor_height_ending"] > 1.665].reset_index(drop=True)
print("we in here")
return df
def remove_minimum_habitable_room_size(df):
# Need minimum of 6.5m per habitable room
df = df[
df["total_floor_area_ending"] / df["number_habitable_rooms"] > 6.5
].reset_index(drop=True)
return df
def keep_flats(df):
df = df[df["property_type"] == "Flat"]
return df
def keep_non_zero_rdsap(df):
df = df[df["rdsap_change"] != 0]
return df
# def keep_ending_columns(df):
# ending_column_index = [ col_name.endswith("_ENDING") for col_name in list(df.columns)]
# keep_columns = df.columns[ending_column_index].to_list()
# keep_columns.append("SAP_STARTING")
# print(keep_columns)
# df = df[keep_columns]
# return df
business_logic = {
# "keep_non_zero_rdsap": keep_non_zero_rdsap,
# "keep_flats": keep_flats,
# "remove_minimum_habitable_room_size": remove_minimum_habitable_room_size,
# "remove_floor_height_ending": remove_floor_height_ending
# "remove_starting_columns": remove_starting_columns
# "keep_ENDING_COLUMNS": keep_ending_columns
}
"""
New features dict + function
"""
# def SAP_ENDING(df):
# return df["SAP_STARTING"] + df["RDSAP_CHANGE"]
# new_feature_funcs = {"SAP_ENDING": SAP_ENDING}
new_feature_funcs = {}

View file

@ -1,35 +0,0 @@
"""
After predictions, we may want to apply some post processing to the predictions
"""
import pandas as pd
def clip_predictions_to_minimum_value(
data: pd.DataFrame, predictions: pd.Series, minimum_value: int = 0
) -> pd.Series:
series_name = predictions.name
predictions.name = "predictions"
predictions_df = pd.concat([data, predictions], axis=1)
# We expect all prediction to be atleast one point improvement
replace_index = (
predictions_df["sap_starting"] + minimum_value > predictions_df["predictions"]
)
predictions_df.loc[replace_index, "predictions"] = (
predictions_df.loc[replace_index, "sap_starting"] + minimum_value
)
predictions_new = predictions_df["predictions"]
predictions_new.name = series_name
return predictions_new
# def round_predictions(data: pd.DataFrame, predictions: pd.Series) -> pd.Series:
# return predictions.round()
post_prediction_logic = {
"clip_predictions_to_minimum_value": clip_predictions_to_minimum_value,
# "round_predictions": round_predictions
}

View file

@ -1,13 +0,0 @@
default:
scenarios:
input_dataclient_type: aws-s3
output_dataclient_type: local
scenario_data_filepaths:
# - s3://retrofit-data-dev/scenario_data/22-03-2024-19-20-09/recommendations_scoring_data.parquet
# - s3://retrofit-data-dev/scenario_data/24-03-2024-20-23-25/recommendations_scoring_data.parquet
# - s3://retrofit-data-dev/scenario_data/27-03-2024-11-38-15/recommendations_scoring_data.parquet
# - s3://retrofit-data-dev/scenario_data/26-05-2024-08-47-45/recommendations_scoring_data.parquet
# - s3://retrofit-data-dev/scenario_data/26-05-2024-10-44-53/recommendations_scoring_data.parquet
- s3://retrofit-data-dev/scenario_data/28-05-2024-19-22-41/recommendations_scoring_data.parquet
comparison_output_filepath: ./metrics/scenario_table.md
metrics_output_filepath: ./metrics/scenario_metrics.md

View file

@ -1,81 +0,0 @@
default:
startup_cleanup:
artefacts: ./data
metrics: ./metrics
client:
aws-s3:
AWS_ACCESS_KEY_ID: null # Use local credentials
AWS_SECRET_ACCESS_KEY: null # Use local credentials
ENDPOINT_URL: null # Use local credentials
aws-s3-mock:
AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
ENDPOINT_URL: http://localhost:9000
local:
null
prepare_data:
input_dataclient_type: aws-s3
output_dataclient_type: local
# data_filepath: s3://retrofit-data-dev/sap_change_model/2024-03-22-18-56-53/dataset_rooms.parquet
# data_filepath: s3://retrofit-data-dev/sap_change_model/2024-05-25-08-36-36/dataset_rooms.parquet
# data_filepath: s3://retrofit-data-dev/sap_change_model/2024-05-26-10-31-39/dataset_rooms.parquet
data_filepath: s3://retrofit-data-dev/sap_change_model/2024-05-28-19-08-25/dataset_rooms.parquet
train_proportion: 0.9
output_train_filepath: ./data/prepared_data/train.parquet
output_test_filepath: ./data/prepared_data/test.parquet
feature_processor:
feature_processor_type: dataframe
feature_processor_config:
subsample_amount: null
subsample_seed: 0
target: sap_ending
identifier_columns: ["uprn"]
# drop_columns: ["heat_demand_change", "carbon_change", "rdsap_change", "heat_demand_ending", "carbon_ending", "days_to_starting", "days_to_ending"]
drop_columns: [
"heat_demand_change", "carbon_change", "rdsap_change", "heat_demand_ending", "carbon_ending", "days_to_starting", "days_to_ending",
'number_habitable_rooms_starting', 'number_habitable_rooms_ending', 'number_heated_rooms_starting', 'number_heated_rooms_ending',
'number_habitable_rooms', 'number_heated_rooms']
retain_features: null
# retain_features: ['uprn', 'sap_starting', 'hot_water_energy_eff_ending',
# 'mainheat_energy_eff_ending', 'constituency', 'roof_energy_eff_ending',
# 'walls_energy_eff_ending', 'secondheat_description_ending',
# 'property_type', 'mainheatc_energy_eff_ending', 'built_form',
# 'walls_insulation_thickness_ending', 'potential_energy_efficiency',
# 'transaction_type_ending',
# 'floor_thermal_transmittance_ending',
# 'low_energy_lighting_ending', 'heat_demand_starting',
# 'photo_supply_ending', 'carbon_starting',
# 'walls_thermal_transmittance_ending',
# 'roof_insulation_thickness_ending',
# 'total_floor_area_ending', 'number_open_fireplaces_ending',
# 'windows_energy_eff_ending',
# 'floor_height_ending',
# 'extension_count_ending',
# 'has_air_source_heat_pump_ending',
# 'charging_system_ending', 'construction_age_band', 'glazed_type_ending',
# 'roof_thermal_transmittance_ending',
# 'floor_insulation_thickness_ending', 'has_mains_gas_ending',
# 'estimated_perimeter_starting', 'energy_consumption_potential',
# 'environment_impact_potential', 'heater_type_ending',
# 'multi_glaze_proportion_ending',
# 'lighting_energy_eff_ending', 'fixed_lighting_outlets_count']
generate_predictions:
input_dataclient_type: local
output_dataclient_type: local
test_data_filepath: ./data/prepared_data/test.parquet
predictions_output_filepath: ./data/predictions/predictions.parquet
predictions_column_name: predictions
identifier_column: id
generate_metrics:
dataclient_type: local
metrics_type: Regression
metrics_output_filepath: ./metrics/metrics.json
dev:
generate_predictions:
input_dataclient_type: aws-s3

View file

@ -1,190 +0,0 @@
schema: '2.0'
stages:
startup_cleanup:
cmd: python 0_startup_cleanup.py
deps:
- path: 0_startup_cleanup.py
hash: md5
md5: b1b12f6b6393fbf8b83d23684df0a3d4
size: 1220
params:
configs/settings.yaml:
default.startup_cleanup.artefacts: ./data
default.startup_cleanup.metrics: ./metrics
prepare_data:
cmd: python 1_prepare_data.py
deps:
- path: 1_prepare_data.py
hash: md5
md5: 11a3b8bfdfe199ab7ecc39ccc5652649
size: 4298
params:
configs/settings.yaml:
default.feature_processor.feature_processor_config.drop_columns:
- heat_demand_change
- carbon_change
- rdsap_change
- heat_demand_ending
- carbon_ending
- days_to_starting
- days_to_ending
- number_habitable_rooms_starting
- number_habitable_rooms_ending
- number_heated_rooms_starting
- number_heated_rooms_ending
- number_habitable_rooms
- number_heated_rooms
default.feature_processor.feature_processor_config.retain_features:
default.feature_processor.feature_processor_config.subsample_amount:
default.feature_processor.feature_processor_config.subsample_seed: 0
default.feature_processor.feature_processor_config.target: sap_ending
default.feature_processor.feature_processor_type: dataframe
default.prepare_data.data_filepath:
s3://retrofit-data-dev/sap_change_model/2024-05-28-19-08-25/dataset_rooms.parquet
default.prepare_data.input_dataclient_type: aws-s3
default.prepare_data.output_dataclient_type: local
default.prepare_data.output_test_filepath: ./data/prepared_data/test.parquet
default.prepare_data.output_train_filepath: ./data/prepared_data/train.parquet
default.prepare_data.train_proportion: 0.9
outs:
- path: data/prepared_data/
hash: md5
md5: 80c9e138146a1d96b9d16091c207e2e8.dir
size: 45056059
nfiles: 2
build_model:
cmd: python 2_build_model.py
deps:
- path: 2_build_model.py
hash: md5
md5: 7231450b78920b0c5e7c6bada496b24a
size: 4820
- path: data/prepared_data
hash: md5
md5: 80c9e138146a1d96b9d16091c207e2e8.dir
size: 45056059
nfiles: 2
params:
configs/build_model.yaml:
default:
build_model:
model_type: AutogluonAutoML
model_save_filepath: ./data/model/optimised/
fit_metrics_filepath: ./metrics/fit_metrics.json
fit_predictions_filepath: ./data/fit_predictions/predictions.parquet
SKLearnLinearRegression:
SKLearnSVMRegression:
kernel: linear
AutogluonAutoML:
output_filepath: ./data/model/allmodels/
problem_type: regression
eval_metric: mean_squared_error
time_limit: 1800
presets: medium_quality
excluded_model_types:
- RF
- CAT
- NN_TORCH
- KNN
- XT
infer_limit: 0.05
infer_limit_batch_size: 10000
ag_args_ensemble:
num_folds_parallel: 2
outs:
- path: data/fit_predictions/
hash: md5
md5: d9c9afc05e8780db47c0548b19bf7d19.dir
size: 3349989
nfiles: 1
- path: data/model/
hash: md5
md5: 13c3100e1486c27a83a8a47491077842.dir
size: 773523079
nfiles: 36
- path: metrics/fit_metrics.json
hash: md5
md5: 2ff70a2a45813e1bcdf2ea3aa8e07d4a
size: 224
generate_predictions:
cmd: python 3_generate_predictions.py
deps:
- path: 3_generate_predictions.py
hash: md5
md5: 0a70ad4dfe99414a75d1261c75a177b9
size: 2464
- path: data/model
hash: md5
md5: 13c3100e1486c27a83a8a47491077842.dir
size: 773523079
nfiles: 36
- path: data/prepared_data
hash: md5
md5: 80c9e138146a1d96b9d16091c207e2e8.dir
size: 45056059
nfiles: 2
params:
configs/settings.yaml:
default.generate_predictions.input_dataclient_type: local
default.generate_predictions.output_dataclient_type: local
default.generate_predictions.predictions_column_name: predictions
default.generate_predictions.predictions_output_filepath: ./data/predictions/predictions.parquet
default.generate_predictions.test_data_filepath: ./data/prepared_data/test.parquet
outs:
- path: data/predictions/
hash: md5
md5: 5d07bcebf3160a72bb18dfd79106e85c.dir
size: 463197
nfiles: 1
generate_metrics:
cmd: python 4_generate_metrics.py
deps:
- path: 4_generate_metrics.py
hash: md5
md5: 4fedb86d89d528f0a6597934ba3890a0
size: 3484
- path: data/predictions
hash: md5
md5: 5d07bcebf3160a72bb18dfd79106e85c.dir
size: 463197
nfiles: 1
- path: data/prepared_data
hash: md5
md5: 80c9e138146a1d96b9d16091c207e2e8.dir
size: 45056059
nfiles: 2
params:
configs/settings.yaml:
default.generate_metrics.dataclient_type: local
default.generate_metrics.metrics_output_filepath: ./metrics/metrics.json
default.generate_metrics.metrics_type: Regression
outs:
- path: metrics/metrics.json
hash: md5
md5: 3e08df02fd5c5d094bcf936e1338d596
size: 223
generate_scenerio_metrics:
cmd: python 5_generate_scenarios.py
deps:
- path: 5_generate_scenarios.py
hash: md5
md5: 40506749fefd926d47c60ff5b16db307
size: 5337
params:
configs/scenarios.yaml:
default.scenarios:
input_dataclient_type: aws-s3
output_dataclient_type: local
scenario_data_filepaths:
- s3://retrofit-data-dev/scenario_data/28-05-2024-19-22-41/recommendations_scoring_data.parquet
comparison_output_filepath: ./metrics/scenario_table.md
metrics_output_filepath: ./metrics/scenario_metrics.md
outs:
- path: metrics/scenario_metrics.md
hash: md5
md5: fa4d6d7bbd7818613800da5f8f37ea96
size: 363
- path: metrics/scenario_table.md
hash: md5
md5: d6baf100a1623cc2467c2f8221d314c9
size: 2133

View file

@ -1,87 +0,0 @@
stages:
startup_cleanup:
cmd: python 0_startup_cleanup.py
deps:
- 0_startup_cleanup.py
params:
- configs/settings.yaml:
- default.startup_cleanup.artefacts
- default.startup_cleanup.metrics
always_changed: true
prepare_data:
cmd: python 1_prepare_data.py
deps:
- 1_prepare_data.py
params:
- configs/settings.yaml:
- default.prepare_data.input_dataclient_type
- default.prepare_data.output_dataclient_type
- default.prepare_data.data_filepath
- default.prepare_data.train_proportion
- default.prepare_data.output_train_filepath
- default.prepare_data.output_test_filepath
- default.feature_processor.feature_processor_type
- default.feature_processor.feature_processor_config.subsample_amount
- default.feature_processor.feature_processor_config.subsample_seed
- default.feature_processor.feature_processor_config.target
- default.feature_processor.feature_processor_config.drop_columns
- default.feature_processor.feature_processor_config.retain_features
outs:
- data/prepared_data/
always_changed: true
build_model:
cmd: python 2_build_model.py
deps:
- 2_build_model.py
- data/prepared_data
params:
- configs/build_model.yaml:
outs:
- data/model/
- data/fit_predictions/
- metrics/fit_metrics.json
always_changed: true
generate_predictions:
cmd: python 3_generate_predictions.py
deps:
- 3_generate_predictions.py
- data/prepared_data
- data/model
params:
- configs/settings.yaml:
- default.generate_predictions.input_dataclient_type
- default.generate_predictions.output_dataclient_type
- default.generate_predictions.test_data_filepath
- default.generate_predictions.predictions_output_filepath
- default.generate_predictions.predictions_column_name
outs:
- data/predictions/
always_changed: true
generate_metrics:
cmd: python 4_generate_metrics.py
deps:
- 4_generate_metrics.py
- data/prepared_data
- data/predictions
params:
- configs/settings.yaml:
- default.generate_metrics.dataclient_type
- default.generate_metrics.metrics_type
- default.generate_metrics.metrics_output_filepath
outs:
- metrics/metrics.json
always_changed: true
generate_scenerio_metrics:
cmd: python 5_generate_scenarios.py
deps:
- 5_generate_scenarios.py
params:
- configs/scenarios.yaml:
- default.scenarios
outs:
- metrics/scenario_table.md
- metrics/scenario_metrics.md
always_changed: true
metrics:
- metrics/metrics.json
- metrics/fit_metrics.json

View file

@ -1,248 +0,0 @@
"""
Doing some eda on dataset
"""
# Look at response variable
from matplotlib import pyplot as plt
import pandas as pd
train_df = pd.read_parquet("./data/prepared_data/train.parquet")
target = "SAP_ENDING"
train_df = train_df.head(10000)
# train_df[target].plot(kind='hist')
# Plot the target variable
fig, ax = plt.subplots(figsize=(10, 7))
ax.hist(train_df[target], bins=range(min(train_df[target]), max(train_df[target])))
fig
# Find correlation to sale price (numeric)
train_df.dtypes
# All numerical
train_df_corr = train_df.corr()
train_df_corr.style.background_gradient(cmap="coolwarm")
train_df_corr["EXTENSION_COUNT_ENDING"]
# Check out some correlation plots between variables
# sap starting - negative correlation
train_df[[target, "SAP_STARTING"]].plot(y=target, x="SAP_STARTING", style="o")
# head demand - light positive correlation
train_df[[target, "HEAT_DEMAND_STARTING"]].plot(
x=target, y="HEAT_DEMAND_STARTING", style="o"
)
# Both make sense: i.e. the higher the sap, the lower we predict and the higher the heat demand, the higher we predict
# Load the autogluon model and check feature importance
import os
import yaml
import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.Logger import logger
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_path = Path(__file__).parent / "configs" / "client.yaml"
client_params = yaml.safe_load(open(client_path))
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
generate_predictions_path = (
Path(__file__).parent / "configs" / "generate_predictions.yaml"
)
generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
model = model_factory(build_model_params["model_type"])
model_filepath = build_model_params["model_save_filepath"]
model.load_model(model_filepath)
fi = model.model.feature_importance(train_df.reset_index(drop=True))
pred = pd.read_parquet("./data/predictions/predictions.parquet")
test_df = pd.read_parquet("./data/prepared_data/test.parquet")
# test_df = test_df.head(1000)
test_df["predictions"] = pred["predictions"]
test_df.groupby("PROPERTY_TYPE").apply(
lambda x: (x.SAP_ENDING - x.predictions).abs().mean()
)
test_df.head()
flat_df = test_df[test_df["PROPERTY_TYPE"] == "Flat"]
flat_df["residual"] = abs(flat_df["predictions"] - flat_df[target])
generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml"
generate_metrics_params = yaml.safe_load(open(generate_metrics_path))
from core.MLMetrics import metrics_factory
metrics = metrics_factory(generate_metrics_params["metrics_type"])
metrics_output = metrics.generate_metrics(
target=flat_df[target],
predictions=pd.Series(flat_df["predictions"]),
)
# Use alibi to run permutation importance
from alibi.explainers import PermutationImportance, plot_permutation_importance
from sklearn.metrics import mean_absolute_percentage_error
import numpy as np
import pandas as pd
test_df = pd.read_parquet("./data/prepared_data/test.parquet")
test_df = test_df.head(1000)
target = "SAP_ENDING"
feature_names = test_df.columns.to_list()
feature_names.remove(target)
x = test_df[feature_names].to_numpy()
y = test_df[target].to_numpy()
def predict_fn(X: np.ndarray) -> np.ndarray:
return model.predict(pd.DataFrame(X, columns=feature_names))
pfi = PermutationImportance(
predictor=predict_fn,
loss_fns=mean_absolute_percentage_error,
feature_names=feature_names,
verbose=True,
)
exp = pfi.explain(x, y)
plot_permutation_importance(exp, fig_kw={"figwidth": 7, "figheight": 6})
[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTITUENCY",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
"CONSTRUCTION_AGE_BAND",
"TRANSACTION_TYPE_STARTING",
"LIGHTING_DESCRIPTION_STARTING",
"MAINHEAT_DESCRIPTION_STARTING",
"HOTWATER_DESCRIPTION_STARTING",
"MAIN_FUEL_STARTING",
"MECHANICAL_VENTILATION_STARTING",
"SECONDHEAT_DESCRIPTION_STARTING",
"ENERGY_TARIFF_STARTING",
"SOLAR_WATER_HEATING_FLAG_STARTING",
"PHOTO_SUPPLY_STARTING",
"WINDOWS_DESCRIPTION_STARTING",
"GLAZED_TYPE_STARTING",
"MULTI_GLAZE_PROPORTION_STARTING",
"LOW_ENERGY_LIGHTING_STARTING",
"NUMBER_OPEN_FIREPLACES_STARTING",
"MAINHEATCONT_DESCRIPTION_STARTING",
"EXTENSION_COUNT_STARTING",
"TOTAL_FLOOR_AREA_STARTING",
"FLOOR_HEIGHT_STARTING",
"DAYS_TO_STARTING",
"WALLS_DESCRIPTION_STARTING",
"FLOOR_DESCRIPTION_STARTING",
]
# Use shap package to explain why 9158 has a 35 prediction when its sap ending is 96
#
#
from core.MLModels import model_factory
from core.DataClient import dataclient_factory
import pandas as pd
from config import settings
client_params = settings.client
prepare_data_params = settings.prepare_data
feature_process_params = settings.feature_processor
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
prediction_analysis_params = settings.prediction_analysis
model = model_factory(build_model_params["model_type"])
model.load_model(build_model_params["model_save_filepath"])
dataclient_type = prediction_analysis_params["dataclient_type"]
# dataclient_type = 'aws-s3'
# dataclient = dataclient_factory(
# dataclient_type=dataclient_type,
# dataclient_config=client_params[dataclient_type],
# )
# data = dataclient.load_data("s3://retrofit-data-dev/sap_change_model/dataset.parquet")
target = feature_process_params["feature_processor_config"]["target"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
output_test_filepath = prepare_data_params["output_test_filepath"]
predictions_output_filepath = generate_predictions_params["predictions_output_filepath"]
# score_data = dataclient.load_data("s3://retrofit-data-dev/carbon_change_predictions/51/2023-11-28T21:01:21.869339.parquet")
local_dataclient = dataclient_factory(
dataclient_type="local",
dataclient_config=client_params["local"],
)
test_df = local_dataclient.load_data(output_test_filepath)
predictions = local_dataclient.load_data(predictions_output_filepath)
mix_df = pd.concat([test_df.copy(), predictions], axis=1)
mix_df["residual"] = abs(mix_df[predictions_column_name] - mix_df[target])
mix_df = mix_df.sort_values("residual", ascending=False)
cosine_similarity_df = mix_df[mix_df.columns.difference(["predictions", "residual"])]
from sklearn.metrics.pairwise import cosine_similarity
row_index = 0
from sklearn.preprocessing import LabelEncoder
object_columns = cosine_similarity_df.select_dtypes(["object"])
cosine_similarity_df[object_columns.columns] = cosine_similarity_df[
object_columns.columns
].apply(LabelEncoder().fit_transform)
feature_vector = cosine_similarity_df.loc[[row_index]]
cosine_similarity_df["cosine"] = cosine_similarity(cosine_similarity_df, feature_vector)
similar_index = (
cosine_similarity_df.sort_values("cosine", ascending=False).head(15).index
)
check_df = mix_df.loc[similar_index]
columns_to_check = [
"LOW_ENERGY_LIGHTING_ENDING",
"walls_thermal_transmittance_ENDING",
"floor_thermal_transmittance_ENDING",
"roof_thermal_transmittance_ENDING",
"roof_insulation_thickness_ENDING",
]
cosine_similarity_df = mix_df[columns_to_check]

View file

@ -1,56 +0,0 @@
import pandas as pd
from configs.post_prediction_logic import post_prediction_logic
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
def generate_predictions(
input_dataclient: DataClient,
output_dataclient: DataClient,
model: MLModel,
target: str,
model_filepath: str,
test_data_filepath: str,
predictions_output_filepath: str,
predictions_column_name: str,
identifier_column: str = "id",
):
"""
For a given model, we generate prediction and evaluate this against the true target
"""
logger.info("--- Loading test data ---")
test_data = input_dataclient.load_data(
location=test_data_filepath, load_config=None
)
logger.info("--- Loading model ---")
model.load_model(model_filepath)
logger.info("--- Generating predictions ---")
prediction_data = (
test_data.drop(columns=target) if target in test_data.columns else test_data
)
predictions = model.predict(
data=prediction_data, post_prediction_logic=post_prediction_logic
)
logger.info("--- Saving predictions ---")
predictions_df = pd.DataFrame(predictions)
predictions_df.columns = [predictions_column_name]
output_df = (
pd.concat([test_data[identifier_column], predictions_df], axis=1)
if identifier_column in test_data.columns
else predictions_df
)
output_dataclient.save_data(
obj=output_df, location=predictions_output_filepath, save_config=None
)

View file

@ -1,4 +0,0 @@
/fit_metrics.json
/metrics.json
/scenario_table.md
/scenario_metrics.md

View file

@ -1,137 +0,0 @@
"""
Post Model generation step:
We want to look at feature analysis of the model
"""
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
from core.MLModels import model_factory
from core.DataClient import dataclient_factory
from alibi.explainers import PermutationImportance, plot_permutation_importance
import numpy as np
import pandas as pd
from config import settings
client_params = settings.client
prepare_data_params = settings.prepare_data
feature_process_params = settings.feature_processor
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
model_analysis_params = settings.model_analysis
model = model_factory(build_model_params["model_type"])
model.load_model(build_model_params["model_save_filepath"])
dataclient_type = model_analysis_params["dataclient_type"]
dataclient = dataclient_factory(
dataclient_type=dataclient_type,
dataclient_config=client_params[dataclient_type],
)
feature_importance_filepath = model_analysis_params["feature_importance_filepath"]
permutation_subsample_amount = model_analysis_params["permutation_subsample_amount"]
loss_fns = model_analysis_params["loss_fns"]
feature_importance_column = model_analysis_params["feature_importance_column"]
n_repeats = model_analysis_params["n_repeats"]
figwidth = model_analysis_params["figwidth"]
figheight = model_analysis_params["figheight"]
target = feature_process_params["feature_processor_config"]["target"]
output_test_filepath = prepare_data_params["output_test_filepath"]
def model_analysis(
model: MLModel,
dataclient: DataClient,
target: str,
output_test_filepath: str,
feature_importance_filepath: str,
permutation_subsample_amount: int = 100,
loss_fns: str = "mean_absolute_percentage_error",
feature_importance_column: str = "importance",
n_repeats: int = 5,
figwidth: int = 7,
figheight: int = 6,
):
"""
Key task is to take in a model and generate:
- feature importance
and save these outputs
"""
logger.info("------------------------------------")
logger.info(f"--- Generate Feature Importance ---")
logger.info("------------------------------------")
test_df = dataclient.load_data(output_test_filepath)
test_df = test_df.head(permutation_subsample_amount)
feature_names = test_df.columns.to_list()
feature_names.remove(target)
x = test_df[feature_names].to_numpy()
y = test_df[target].to_numpy()
def predict_fn(X: np.ndarray) -> np.ndarray:
return model.predict(pd.DataFrame(X, columns=feature_names))
pfi = PermutationImportance(
predictor=predict_fn,
loss_fns=loss_fns,
feature_names=feature_names,
verbose=True,
)
logger.info(
f"Permutation feature importance - using {permutation_subsample_amount} samples and {n_repeats} shuffles per feature:"
)
exp = pfi.explain(x, y, n_repeats=n_repeats)
mean_value_feature_importance = [
element["mean"] for element in exp.data["feature_importance"][0]
]
feature_importance_df = pd.DataFrame(
mean_value_feature_importance,
index=exp.data["feature_names"],
columns=[feature_importance_column],
).sort_values(feature_importance_column, ascending=False)
plot_permutation_importance(
exp, fig_kw={"figwidth": figwidth, "figheight": figheight}
)
logger.info("--------------------------------------")
logger.info(f"--- Save Feature Importance table ---")
logger.info("--------------------------------------")
dataclient.save_data(feature_importance_df, location=feature_importance_filepath)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
model_analysis(
model=model,
dataclient=dataclient,
target=target,
output_test_filepath=output_test_filepath,
feature_importance_filepath=feature_importance_filepath,
permutation_subsample_amount=permutation_subsample_amount,
loss_fns=loss_fns,
feature_importance_column=feature_importance_column,
n_repeats=n_repeats,
figwidth=figwidth,
figheight=figheight,
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -1,134 +0,0 @@
"""
Look at why the model made such a prediction
Manual script to run
Workflow:
- Identify a prediction row/s that you wish to look into
- i.e. a bad prediction/s
- Add these rows to the config
- Run script
"""
import shap
shap.initjs()
from typing import List
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
from core.MLModels import model_factory
from core.DataClient import dataclient_factory
import pandas as pd
from config import settings
client_params = settings.client
prepare_data_params = settings.prepare_data
feature_process_params = settings.feature_processor
build_model_params = settings.build_model
generate_predictions_params = settings.generate_predictions
prediction_analysis_params = settings.prediction_analysis
model = model_factory(build_model_params["model_type"])
model.load_model(build_model_params["model_save_filepath"])
dataclient_type = prediction_analysis_params["dataclient_type"]
dataclient = dataclient_factory(
dataclient_type=dataclient_type,
dataclient_config=client_params[dataclient_type],
)
target = feature_process_params["feature_processor_config"]["target"]
predictions_column_name = generate_predictions_params["predictions_column_name"]
output_test_filepath = prepare_data_params["output_test_filepath"]
predictions_output_filepath = generate_predictions_params["predictions_output_filepath"]
nshap_samples = prediction_analysis_params["nshap_samples"]
row_index = prediction_analysis_params["row_index"]
def prediction_analysis(
model: MLModel,
dataclient: DataClient,
target: str,
predictions_column_name: str,
output_test_filepath: str,
predictions_output_filepath: str,
nshap_samples: int,
row_index: List[int],
):
test_df = dataclient.load_data(output_test_filepath)
predictions = dataclient.load_data(predictions_output_filepath)
mix_df = pd.concat([test_df.copy(), predictions], axis=1)
mix_df["residual"] = abs(mix_df[predictions_column_name] - mix_df[target])
mix_df = mix_df.sort_values("residual", ascending=False)
test_df_without_target = test_df.drop(columns=[target])
class ModelWrapper:
def __init__(self, model, feature_names):
self.model = model
self.feature_names = feature_names
def predict(self, X):
if isinstance(X, pd.Series):
X = X.values.reshape(1, -1)
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X, columns=self.feature_names)
return self.model.predict(X)
model_wrapper = ModelWrapper(model, feature_names=test_df_without_target.columns)
explainer = shap.KernelExplainer(
model_wrapper.predict, test_df_without_target.head(100)
)
shap_predictions_df = pd.DataFrame(index=test_df_without_target.columns)
for index in row_index:
single_datapoint = test_df_without_target.iloc[[index]]
# single_prediction = model_wrapper.predict(single_datapoint)
shap_values_single = explainer.shap_values(
single_datapoint, nsamples=nshap_samples
)
shap.force_plot(
explainer.expected_value,
shap_values_single,
test_df_without_target.iloc[index, :],
)
shap_single_prediction_df = pd.DataFrame(
shap_values_single, columns=test_df_without_target.columns
).T
shap_single_prediction_df.columns = [index]
shap_single_prediction_df = shap_single_prediction_df.sort_values(index)
shap_predictions_df = pd.merge(
left=shap_predictions_df,
right=shap_single_prediction_df,
left_index=True,
right_index=True,
)
return shap_predictions_df
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
prediction_analysis(
model=model,
dataclient=dataclient,
target=target,
predictions_column_name=predictions_column_name,
output_test_filepath=output_test_filepath,
nshap_samples=nshap_samples,
row_index=row_index,
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -1,7 +0,0 @@
joblib==1.3.2
boto3==1.28.17
pandas==2.1.4
autogluon.tabular[all]==1.0.0
dynaconf==3.2.1
pyarrow==13.0.0
pre-commit==3.3.3

View file

@ -1,7 +0,0 @@
joblib==1.3.2
boto3==1.28.17
pandas==2.1.4
autogluon.tabular[all]==1.0.0
dynaconf==3.2.1
pyarrow==13.0.0
PyYAML==6.0.1

View file

@ -1,10 +0,0 @@
joblib==1.3.2
boto3==1.28.17
pandas==2.1.4
autogluon.tabular[all]==1.0.0
ray==2.6.3
dynaconf==3.2.1
alibi==0.9.5
shap==0.42.1
pyarrow==13.0.0
pre-commit==3.3.3

View file

@ -1,4 +0,0 @@
boto3==1.28.41
pandas==2.1.4
autogluon.tabular[all]==1.0.0
dynaconf==3.2.1

View file

@ -1,4 +0,0 @@
dvc==3.51.0
dvc-s3==3.2.0
gto==1.7.1
pyOpenSSL==23.3.0

Binary file not shown.

View file

@ -0,0 +1,3 @@
/config.local
/tmp
/cache

View file

@ -0,0 +1,3 @@
# Add patterns of files dvc should ignore, which could improve
# the performance. Learn more at
# https://dvc.org/doc/user-guide/dvcignore

View file

@ -0,0 +1 @@
{}

View file

@ -0,0 +1,35 @@
# Training
This folder contains the code base for training experimentation.
To understand the pipeline, run `dvc dag`
There are 3 main steps:
- Preparing data
- This is loading data (locally or from s3)
- Splitting the data into train and validation
- Creating additional features (if needed)
- **Data is cached**
- This will be down to the dvc remote location
- Build model
- For the prepared data, we build a model using our configurations
- Model is saved (locally or s3)
- **Model is cached**
- This will be down to the dvc remote location
- Generate Metrics
- For the given model, we generate metrics on validation data/test data
- **Metrics are cached**
- This will be down to the dvc remote location
Workflow:
- Use `dvc metrics show` to view current metrics score
- Adjust parameters/ codebase
- When happy with changes, use `dvc exp run` to trigger an experiment
- Due to cache, only need stages are re-run
- Use `dvc metrics diff` to check the change in metrics
- Use `dvc exp show` to view all experiments
- NOTE: the last experiment will always be applied to the workspace!
- After running experiments, you can apply the the best model to workspace using `dvc exp apply [EXPERIMENT_NAME]`
- This experiment will have the corresponding .dvc files for the hashed model and data
- Use version control as normal
- git add, git commit etc

View file

@ -0,0 +1,106 @@
"""
Second Pipieline step:
Once we have the features, we build a model
"""
import os
import yaml
import pandas as pd
from typing import Union
from pathlib import Path
from core.Logger import logger
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def build_model(
dataclient: DataClient,
model: MLModel,
target: str,
model_save_location: str,
model_hyperparameters: dict,
train_filepath: Union[str, None] = None,
test_filepath: Union[str, None] = None,
train_data: Union[pd.DataFrame, None] = None,
test_data: Union[pd.DataFrame, None] = None,
pipeline_mode: bool = False,
):
logger.info("--------------------------------------")
logger.info("--- Loading Data for build process ---")
logger.info("--------------------------------------")
if train_data is None:
if train_filepath is None:
raise ValueError(f"Need {train_filepath} if no data supplied")
train_data = dataclient.load_data(location=train_filepath)
if test_data is None:
if test_filepath is None:
raise ValueError(f"Need {test_filepath} if no data supplied")
test_data = dataclient.load_data(location=test_filepath)
logger.info("----------------------")
logger.info("--- Training model ---")
logger.info("----------------------")
model.train_model(
data=train_data, target=target, model_hyperparameters=model_hyperparameters
)
logger.info("--------------------")
logger.info("--- Saving model ---")
logger.info("--------------------")
model.save_model(path=Path(model_save_location))
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
logger.info("----------------------------")
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
# Output of previous prepare data step, will be where the data is
dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"])
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
logger.info("-------------------------")
model_type = build_model_params["model_type"]
model = model_factory(model_type)
logger.info("--------------------------")
logger.info(f"--- Build Model Stage ---")
logger.info("--------------------------")
build_model(
dataclient=dataclient,
model=model,
target=feature_process_params["feature_processor_config"]["target"],
model_save_location=build_model_params["model_save_filepath"],
model_hyperparameters=build_model_params[model_type],
train_filepath=prepare_data_params["output_train_filepath"],
test_filepath=prepare_data_params["output_test_filepath"],
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -0,0 +1,15 @@
model_type: SKLearnLinearRegression
model_save_filepath: ./data/model/model.joblib
SKLearnLinearRegression: null
SKLearnSVMRegression:
kernel: "linear"
AutogluonAutoML:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 400
presets: high_quality
excluded_model_types: ['KNN']

View file

@ -0,0 +1,10 @@
aws-s3:
AWS_ACCESS_KEY_ID: null
AWS_SECRET_ACCESS_KEY: null
ENDPOINT_URL: null
aws-s3-mock:
AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
ENDPOINT_URL: http://localhost:9000
local:
null

View file

@ -0,0 +1,3 @@
"""
Stitch all yaml configuration files together, override some settings (such as bucket location) based off environment variables
"""

View file

@ -0,0 +1,8 @@
feature_processor_type: dataframe
feature_processor_config:
subsample_amount: null
subsample_seed: 0
target: RDSAP_CHANGE
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE", "CARBON_CHANGE"]
retain_features: ["TOTAL_FLOOR_AREA_STARTING", "SAP_STARTING", "HEAT_DEMAND_STARTING", "CARBON_STARTING", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", "PHOTO_SUPPLY_STARTING", "MULTI_GLAZE_PROPORTION_STARTING", "LOW_ENERGY_LIGHTING_STARTING", "NUMBER_OPEN_FIREPLACES_STARTING", "EXTENSION_COUNT_STARTING", "FLOOR_HEIGHT_STARTING", "PHOTO_SUPPLY_ENDING", "MULTI_GLAZE_PROPORTION_ENDING", "LOW_ENERGY_LIGHTING_ENDING", "NUMBER_OPEN_FIREPLACES_ENDING", "EXTENSION_COUNT_ENDING", "TOTAL_FLOOR_AREA_ENDING", "FLOOR_HEIGHT_ENDING", "DAYS_TO_STARTING", "DAYS_TO_ENDING"]
# retain_features: null

View file

@ -0,0 +1,13 @@
"""
During the feature processor step, we can apply additional business logic and feature generation by defining them here
"""
"""
Business Logic dict + functions
"""
business_logic = {}
"""
New features dict + function
"""
new_feature_funcs = {}

View file

@ -0,0 +1,5 @@
dataclient_type: local
input_datahandler_type: parquet
output_datahandler_type: json
metrics_type: Regression
metrics_output_filepath: ./metrics/metrics.json

View file

@ -0,0 +1,5 @@
input_dataclient_type: local
output_dataclient_type: local
test_data_filepath: ./data/prepared_data/test.parquet
predictions_output_filepath: ./data/predictions/predictions.parquet
predictions_column_name: predictions

View file

@ -0,0 +1,9 @@
input_dataclient_type: aws-s3
output_dataclient_type: local
datahandler_type: parquet
data_filepath: s3://retrofit-data-dev/sap_change_model/dataset.parquet
train_proportion: 0.9
output_train_filepath: ./data/prepared_data/train.parquet
output_test_filepath: ./data/prepared_data/test.parquet
# cache_o

View file

@ -0,0 +1,2 @@
artefacts: ./data
metrics: ./metrics

View file

@ -8,7 +8,7 @@ import boto3
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
from io import BytesIO from io import BytesIO
from typing import List, Union, Any from typing import List, Union
from core.interface.InterfaceDataClient import DataClient from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger from core.Logger import logger
@ -105,7 +105,7 @@ class AWSS3Client:
def save_data( def save_data(
self, self,
obj: Any, obj: object,
location: str, location: str,
save_config: Union[dict, None] = None, save_config: Union[dict, None] = None,
) -> None: ) -> None:
@ -134,7 +134,7 @@ class AWSS3Client:
obj=obj, location=location, save_config=save_config obj=obj, location=location, save_config=save_config
) )
def _save_parquet(self, obj: pd.DataFrame, location: str, save_config: dict): def _save_parquet(self, obj: object, location: str, save_config: dict):
""" """
Save object as parquet Save object as parquet
""" """
@ -142,15 +142,9 @@ class AWSS3Client:
buffer = BytesIO() buffer = BytesIO()
obj.to_parquet(buffer, index=False) obj.to_parquet(buffer, index=False)
# Reset the buffer position to the beginning
buffer.seek(0)
bucket, key = location.strip("s3://").split("/", 1) bucket, key = location.strip("s3://").split("/", 1)
self.client.upload_fileobj(buffer, bucket, key) self.client.upload_fileobj(buffer, bucket, key)
# Close the buffer
buffer.close()
def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame: def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame:
""" """
Load a parquet file Load a parquet file
@ -245,8 +239,7 @@ class LocalClient:
save_methods = { save_methods = {
".parquet": self._save_parquet, ".parquet": self._save_parquet,
".json": self._save_json, ".json": self._save_json
".md": self._save_md,
# "": _save_directory(**save_config), # "": _save_directory(**save_config),
# ADD MORE save_methods HERE # ADD MORE save_methods HERE
} }
@ -295,10 +288,3 @@ class LocalClient:
# Write the contents of the buffer to the local file # Write the contents of the buffer to the local file
with open(location, "wb") as f: with open(location, "wb") as f:
f.write(buffer.getvalue()) f.write(buffer.getvalue())
def _save_md(self, obj: pd.DataFrame, location: str, save_config: dict):
"""
Save object as markdown
"""
obj.to_markdown(location, **save_config)

View file

@ -9,6 +9,7 @@ Create additional features from the dataset
import pandas as pd import pandas as pd
from typing import List, Callable, Union from typing import List, Callable, Union
from core.interface.InterfaceFeatureProcessor import FeatureProcessor from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from core.Logger import logger
def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor: def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor:
@ -109,9 +110,7 @@ class DataFrameFeatureProcessor:
# TODO: to test # TODO: to test
for key, value in new_feature_funcs.items(): for key, value in new_feature_funcs.items():
key_column = value(df) df[key] = value(df)
key_column.name = key
df = pd.concat([df, key_column], axis=1)
return df return df
@ -136,8 +135,6 @@ class DataFrameFeatureProcessor:
subsample_amount=feature_processor_config["subsample_amount"], subsample_amount=feature_processor_config["subsample_amount"],
subsample_seed=feature_processor_config["subsample_seed"], subsample_seed=feature_processor_config["subsample_seed"],
) )
df = self.apply_business_logic(df, business_logic=business_logic)
df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs)
df = self.drop_unused_columns( df = self.drop_unused_columns(
df, drop_columns=feature_processor_config["drop_columns"] df, drop_columns=feature_processor_config["drop_columns"]
) )
@ -146,4 +143,6 @@ class DataFrameFeatureProcessor:
retain_features=feature_processor_config["retain_features"], retain_features=feature_processor_config["retain_features"],
target=feature_processor_config["target"], target=feature_processor_config["target"],
) )
df = self.apply_business_logic(df, business_logic=business_logic)
df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs)
return df return df

View file

@ -21,7 +21,6 @@ def setup_logger():
# Add the stream handler to the logger # Add the stream handler to the logger
logger.addHandler(stream_handler) logger.addHandler(stream_handler)
logger.propagate = False
return logger return logger

View file

@ -4,7 +4,6 @@ Implementation of MLMetrics, all of which will have two methods:
- Generate Plot Suite - Generate Plot Suite
""" """
import numpy as np
import pandas as pd import pandas as pd
from typing import Union from typing import Union
from sklearn.metrics import ( from sklearn.metrics import (
@ -15,18 +14,6 @@ from sklearn.metrics import (
) )
from core.interface.InterfaceMetrics import MLMetrics from core.interface.InterfaceMetrics import MLMetrics
# Define the function to return the SMAPE value
def symmetric_mape(actual, predicted) -> float:
# Convert actual and predicted to numpy
# array data type if not already
if not all([isinstance(actual, np.ndarray), isinstance(predicted, np.ndarray)]):
actual, predicted = np.array(actual), np.array(predicted)
return np.mean(
np.abs(predicted - actual) / ((np.abs(predicted) + np.abs(actual)) / 2)
)
def metrics_factory(metrics_type: str) -> MLMetrics: def metrics_factory(metrics_type: str) -> MLMetrics:
metrics = { metrics = {
@ -47,7 +34,7 @@ class RegressionMetrics:
median_absolute_error, median_absolute_error,
mean_squared_error, mean_squared_error,
mean_absolute_percentage_error, mean_absolute_percentage_error,
symmetric_mape, # max_error
] ]
def generate_metrics( def generate_metrics(

View file

@ -25,7 +25,7 @@ def model_factory(model_type: str) -> MLModel:
models = { models = {
"SKLearnLinearRegression": SKLearnLinearRegression(), "SKLearnLinearRegression": SKLearnLinearRegression(),
"SKLearnSVMRegression": SKLearnSVMRegression(), "SKLearnSVMRegression": SKLearnSVMRegression(),
"AutogluonAutoML": AutogluonAutoML(), "AutogluonAutoML": AutogluonAutoML()
# ADD OTHER MODELS HERE # ADD OTHER MODELS HERE
} }
@ -75,9 +75,7 @@ class SKLearnLinearRegression:
y_train = data[target] y_train = data[target]
self.model.fit(x_train, y_train) self.model.fit(x_train, y_train)
def predict( def predict(self, data: pd.DataFrame) -> pd.Series:
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
""" """
Method to predict Method to predict
""" """
@ -130,9 +128,7 @@ class SKLearnSVMRegression:
y_train = data[target] y_train = data[target]
self.model.fit(x_train, y_train) self.model.fit(x_train, y_train)
def predict( def predict(self, data: pd.DataFrame) -> pd.Series:
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
""" """
Method to predict Method to predict
""" """
@ -149,9 +145,6 @@ class AutogluonAutoML:
"time_limit", "time_limit",
"presets", "presets",
"excluded_model_types", "excluded_model_types",
"infer_limit",
"infer_limit_batch_size",
"ag_args_ensemble",
] ]
def load_model(self, path: Union[Path, str]) -> None: def load_model(self, path: Union[Path, str]) -> None:
@ -168,12 +161,8 @@ class AutogluonAutoML:
if self.model is None: if self.model is None:
raise KeyError("No model trained/ loaded - unable to save") raise KeyError("No model trained/ loaded - unable to save")
logger.info( logger.info("In local development mode - no need for s3 client")
"Using AutoGluon Model - Model saving is using optimised deployment mode" logger.info("Using AutoGluon Model - Model saving already occured")
)
logger.info("Saving optimised model")
self.model.clone_for_deployment(str(path))
return str(path) return str(path)
@ -206,44 +195,17 @@ class AutogluonAutoML:
time_limit=model_hyperparameters["time_limit"], time_limit=model_hyperparameters["time_limit"],
presets=model_hyperparameters["presets"], presets=model_hyperparameters["presets"],
excluded_model_types=model_hyperparameters["excluded_model_types"], excluded_model_types=model_hyperparameters["excluded_model_types"],
infer_limit=model_hyperparameters["infer_limit"],
infer_limit_batch_size=model_hyperparameters["infer_limit_batch_size"],
ag_args_ensemble=model_hyperparameters["ag_args_ensemble"],
) )
def predict( def predict(self, data: pd.DataFrame) -> pd.Series:
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
""" """
Method to predict Method to predict
""" """
if post_prediction_logic is None:
post_prediction_logic = {}
if self.model is None: if self.model is None:
print("No model loaded/ trained") print("No model loaded/ trained")
exit(1) exit(1)
predictions = pd.Series(self.model.predict(data)) predictions = pd.Series(self.model.predict(data))
if len(post_prediction_logic) != 0:
predictions = self._apply_post_prediction_logic(
data=data,
predictions=predictions,
post_prediction_logic=post_prediction_logic,
)
return predictions
def _apply_post_prediction_logic(
self, data: pd.DataFrame, predictions: pd.Series, post_prediction_logic: dict
):
"""
For predictions, we can apply post processing logic to clean up predictions
"""
for _, value in post_prediction_logic.items():
predictions = value(data, predictions)
return predictions return predictions

View file

@ -3,7 +3,8 @@ Interface for all DataClient i.e. s3, database, local etc
""" """
import pandas as pd import pandas as pd
from typing import Protocol, Union, Any from io import BytesIO
from typing import Protocol, Union
class DataClient(Protocol): class DataClient(Protocol):
@ -21,10 +22,9 @@ class DataClient(Protocol):
""" """
Generic to load data Generic to load data
""" """
...
def save_data( def save_data(
self, obj: Any, location: str, save_config: Union[dict, None] self, obj: object, location: str, save_config: Union[dict, None]
) -> None: ) -> None:
""" """
Generic to save data Generic to save data

View file

@ -32,9 +32,7 @@ class MLModel(Protocol):
""" """
... ...
def predict( def predict(self, data: pd.DataFrame) -> pd.Series:
self, data: pd.DataFrame, post_prediction_logic: dict | None
) -> pd.Series:
""" """
Method to predict Method to predict
""" """

View file

@ -0,0 +1,3 @@
/prepared_data
/model
/predictions

View file

@ -0,0 +1,123 @@
schema: '2.0'
stages:
prepare_data:
cmd: python prepare_data.py
deps:
- path: prepare_data.py
hash: md5
md5: 2cfe9e3012280e0cecdb84da12c974d9
size: 5009
params:
configs/prepare_data.yaml:
output_test_filepath: ./data/prepared_data/test.parquet
output_train_filepath: ./data/prepared_data/train.parquet
train_proportion: 0.9
outs:
- path: data/prepared_data/
hash: md5
md5: ea0a2baf3931e692d6344ba609331089.dir
size: 13232732
nfiles: 2
build_model:
cmd: python build_model.py
deps:
- path: build_model.py
hash: md5
md5: 46bcc34f20c6851cd987640889eefde6
size: 3671
- path: data/prepared_data
hash: md5
md5: ea0a2baf3931e692d6344ba609331089.dir
size: 13232732
nfiles: 2
params:
configs/build_model.yaml:
AutogluonAutoML:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 400
presets: high_quality
excluded_model_types:
- KNN
SKLearnLinearRegression:
SKLearnSVMRegression:
kernel: linear
model_save_filepath: ./data/model/model.joblib
model_type: SKLearnLinearRegression
outs:
- path: data/model/
hash: md5
md5: eb2b910dec66481e75bb6058622f6e55.dir
size: 1832
nfiles: 1
generate_predictions:
cmd: python generate_predictions.py
deps:
- path: data/model
hash: md5
md5: eb2b910dec66481e75bb6058622f6e55.dir
size: 1832
nfiles: 1
- path: data/prepared_data
hash: md5
md5: ea0a2baf3931e692d6344ba609331089.dir
size: 13232732
nfiles: 2
- path: generate_predictions.py
hash: md5
md5: d412c8c9b48b59a29f569633280a6e7f
size: 4237
params:
configs/generate_predictions.yaml:
input_dataclient_type: local
output_dataclient_type: local
predictions_column_name: predictions
predictions_output_filepath: ./data/predictions/predictions.parquet
test_data_filepath: ./data/prepared_data/test.parquet
outs:
- path: data/predictions/
hash: md5
md5: 85ec3fa0cb387a7775eccd23185f7966.dir
size: 643406
nfiles: 1
generate_metrics:
cmd: python generate_metrics.py
deps:
- path: data/predictions
hash: md5
md5: 85ec3fa0cb387a7775eccd23185f7966.dir
size: 643406
nfiles: 1
- path: data/prepared_data
hash: md5
md5: ea0a2baf3931e692d6344ba609331089.dir
size: 13232732
nfiles: 2
- path: generate_metrics.py
hash: md5
md5: 5577a28107458dc1e6bcaaa098388095
size: 4144
params:
configs/generate_metrics.yaml:
dataclient_type: local
input_datahandler_type: parquet
metrics_output_filepath: ./metrics/metrics.json
metrics_type: Regression
output_datahandler_type: json
outs:
- path: metrics/metrics.json
hash: md5
md5: d79f798a272e6b50597be4d08ae48fa8
size: 180
startup_cleanup:
cmd: python startup_cleanup.py
deps:
- path: startup_cleanup.py
hash: md5
md5: 2e51fbcac960d0f960bf32a8ec7486a0
size: 1748
params:
configs/startup_cleanup.yaml:
artefacts: ./data
metrics: ./metrics

View file

@ -0,0 +1,56 @@
stages:
startup_cleanup:
cmd: python startup_cleanup.py
deps:
- startup_cleanup.py
params:
- configs/startup_cleanup.yaml:
- artefacts
- metrics
always_changed: true
prepare_data:
cmd: python prepare_data.py
deps:
- prepare_data.py
params:
- configs/prepare_data.yaml:
- output_test_filepath
- output_train_filepath
- train_proportion
outs:
- data/prepared_data/
always_changed: true
build_model:
cmd: python build_model.py
deps:
- build_model.py
- data/prepared_data
params:
- configs/build_model.yaml:
outs:
- data/model/
always_changed: true
generate_predictions:
cmd: python generate_predictions.py
deps:
- generate_predictions.py
- data/prepared_data
- data/model
params:
- configs/generate_predictions.yaml:
outs:
- data/predictions/
always_changed: true
generate_metrics:
cmd: python generate_metrics.py
deps:
- generate_metrics.py
- data/prepared_data
- data/predictions
params:
- configs/generate_metrics.yaml:
outs:
- metrics/metrics.json
always_changed: true
metrics:
- metrics/metrics.json

View file

@ -0,0 +1,120 @@
"""
Third part of the pipeline:
After the model is built, we can evaluate its performance
"""
import os
import yaml
import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.Logger import logger
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_path = Path(__file__).parent / "configs" / "client.yaml"
client_params = yaml.safe_load(open(client_path))
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
generate_predictions_path = (
Path(__file__).parent / "configs" / "generate_predictions.yaml"
)
generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml"
generate_metrics_params = yaml.safe_load(open(generate_metrics_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_metrics(
input_dataclient: DataClient,
output_dataclient: DataClient,
model: MLModel,
metrics: MLMetrics,
target: str,
test_data_filepath: str,
predictions_output_filepath: str,
predictions_column_name: str,
metrics_output_filepath: str,
):
"""
For a given model, we generate prediction and evaluate this against the true target
"""
logger.info("-------------------------")
logger.info("--- Loading test data ---")
logger.info("-------------------------")
test_data = input_dataclient.load_data(
location=test_data_filepath,
)
logger.info("---------------------------")
logger.info("--- Loading predictions ---")
logger.info("---------------------------")
predictions = input_dataclient.load_data(location=predictions_output_filepath)
logger.info("--------------------------")
logger.info("--- Generating metrics ---")
logger.info("--------------------------")
metrics_output = metrics.generate_metrics(
target=test_data[target],
predictions=pd.Series(predictions[predictions_column_name]),
)
logger.info("----------------------")
logger.info("--- Saving metrics ---")
logger.info("----------------------")
output_dataclient.save_data(obj=metrics_output, location=metrics_output_filepath)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
# Use data client for input and output, as we use dvc to cache later to the cloud
dataclient_type = generate_metrics_params["dataclient_type"]
dataclient = dataclient_factory(
dataclient_type=dataclient_type,
dataclient_config=client_params[dataclient_type],
)
metrics = metrics_factory(generate_metrics_params["metrics_type"])
generate_metrics(
input_dataclient=dataclient,
output_dataclient=dataclient,
model=model,
metrics=metrics,
target=feature_process_params["feature_processor_config"]["target"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[
"predictions_output_filepath"
],
predictions_column_name=generate_predictions_params["predictions_column_name"],
metrics_output_filepath=generate_metrics_params["metrics_output_filepath"],
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -0,0 +1,123 @@
"""
Third part of the pipeline:
After the model is built, we can evaluate its performance
"""
import os
import yaml
import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.Logger import logger
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_path = Path(__file__).parent / "configs" / "client.yaml"
client_params = yaml.safe_load(open(client_path))
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
generate_predictions_path = (
Path(__file__).parent / "configs" / "generate_predictions.yaml"
)
generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_predictions(
input_dataclient: DataClient,
output_dataclient: DataClient,
model: MLModel,
target: str,
model_filepath: str,
test_data_filepath: str,
predictions_output_filepath: str,
predictions_column_name: str,
):
"""
For a given model, we generate prediction and evaluate this against the true target
"""
logger.info("-------------------------")
logger.info("--- Loading test data ---")
logger.info("-------------------------")
test_data = input_dataclient.load_data(location=test_data_filepath)
logger.info("---------------------")
logger.info("--- Loading model ---")
logger.info("---------------------")
model.load_model(model_filepath)
logger.info("------------------------------")
logger.info("--- Generating predictions ---")
logger.info("------------------------------")
prediction_data = (
test_data.drop(columns=target) if target in test_data.columns else test_data
)
predictions = model.predict(data=prediction_data)
logger.info("--------------------------")
logger.info("--- Saving predictions ---")
logger.info("--------------------------")
predictions_df = pd.DataFrame(predictions)
predictions_df.columns = [predictions_column_name]
output_dataclient.save_data(
obj=predictions_df, location=predictions_output_filepath
)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
# We may have different locations of loading hence why we use one specified in generate_predictions.yaml
# I.e. for metric runs, this will be a local data client
# For predictions, we will want a cloud data client
input_dataclient_type = generate_predictions_params["input_dataclient_type"]
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient_type = generate_predictions_params["output_dataclient_type"]
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
generate_predictions(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
model=model,
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=build_model_params["model_save_filepath"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[
"predictions_output_filepath"
],
predictions_column_name=generate_predictions_params["predictions_column_name"],
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -0,0 +1 @@
/metrics.json

View file

@ -15,41 +15,17 @@ from configs.feature_processor_logic import business_logic, new_feature_funcs
from core.Logger import logger from core.Logger import logger
from core.DataClient import dataclient_factory from core.DataClient import dataclient_factory
from core.FeatureProcessor import feature_processor_factory from core.FeatureProcessor import feature_processor_factory
from config import settings
logger.info(f"--- Initiate Parameters ---")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
client_params = settings.client client_path = Path(__file__).parent / "configs" / "client.yaml"
prepare_data_params = settings.prepare_data client_params = yaml.safe_load(open(client_path))
feature_process_params = settings.feature_processor
data_filepath = prepare_data_params["data_filepath"] prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
train_proportion = prepare_data_params["train_proportion"] prepare_data_params = yaml.safe_load(open(prepare_data_path))
output_train_filepath = prepare_data_params["output_train_filepath"]
output_test_filepath = prepare_data_params["output_test_filepath"]
feature_processor_config = feature_process_params["feature_processor_config"]
logger.info(f"--- Initiate DataClient ---") feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
input_dataclient_type = prepare_data_params["input_dataclient_type"]
output_dataclient_type = prepare_data_params["output_dataclient_type"]
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
logger.info(f"--- Initiate FeatureProcessor ---")
feature_processor = feature_processor_factory(
feature_process_params["feature_processor_type"]
)
def prepare_data( def prepare_data(
@ -70,11 +46,15 @@ def prepare_data(
:param pipeline_mode: bool, Default False, this caches out the file for experimentation, objects returned in pipeline mode :param pipeline_mode: bool, Default False, this caches out the file for experimentation, objects returned in pipeline mode
""" """
logger.info("--------------------")
logger.info("--- Loading data ---") logger.info("--- Loading data ---")
logger.info("--------------------")
data = input_dataclient.load_data(location=data_filepath, load_config={}) data = input_dataclient.load_data(location=data_filepath, load_config={})
logger.info("--------------------------")
logger.info("--- Feature Processing ---") logger.info("--- Feature Processing ---")
logger.info("--------------------------")
data = feature_processor.feature_process( data = feature_processor.feature_process(
data, data,
@ -83,51 +63,77 @@ def prepare_data(
new_feature_funcs=new_feature_funcs, new_feature_funcs=new_feature_funcs,
) )
logger.info("----------------------")
logger.info("--- Splitting data ---") logger.info("--- Splitting data ---")
logger.info("----------------------")
if train_proportion == 1: if train_proportion == 1:
train = data train = data
# Sample 10% of the data for testing test = None
test = data.sample(round(len(data) * 0.1))
else: else:
train, test = train_test_split( train, test = train_test_split(
data, train_size=train_proportion, test_size=(1 - train_proportion) data, train_size=train_proportion, test_size=(1 - train_proportion)
) )
test = test.reset_index(drop=True)
train = train.reset_index(drop=True)
logger.info("-----------------------")
logger.info("--- Outputting data ---") logger.info("--- Outputting data ---")
logger.info("-----------------------")
output_dataclient.save_data( output_dataclient.save_data(obj=train, location=output_train_filepath)
obj=train, location=output_train_filepath, save_config=None
)
if test is not None: if test is not None:
output_dataclient.save_data( output_dataclient.save_data(obj=test, location=output_test_filepath)
obj=test, location=output_test_filepath, save_config=None
)
return train, test return train, test
if __name__ == "__main__": if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---") logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
logger.info("----------------------------")
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
input_dataclient_type = prepare_data_params["input_dataclient_type"]
output_dataclient_type = prepare_data_params["output_dataclient_type"]
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
logger.info("----------------------------------")
logger.info(f"--- Initiate FeatureProcessor ---")
logger.info("----------------------------------")
feature_processor = feature_processor_factory(
feature_process_params["feature_processor_type"]
)
logger.info("---------------------------")
logger.info(f"--- Prepare Data Stage ---") logger.info(f"--- Prepare Data Stage ---")
logger.info("---------------------------")
prepare_data( prepare_data(
input_dataclient=input_dataclient, input_dataclient=input_dataclient,
output_dataclient=output_dataclient, output_dataclient=output_dataclient,
feature_processor=feature_processor, feature_processor=feature_processor,
data_filepath=data_filepath, data_filepath=prepare_data_params["data_filepath"],
train_proportion=train_proportion, train_proportion=prepare_data_params["train_proportion"],
output_train_filepath=output_train_filepath, output_train_filepath=prepare_data_params["output_train_filepath"],
output_test_filepath=output_test_filepath, output_test_filepath=prepare_data_params["output_test_filepath"],
feature_processor_config=feature_processor_config, feature_processor_config=feature_process_params["feature_processor_config"],
business_logic=business_logic, business_logic=business_logic,
new_feature_funcs=new_feature_funcs, new_feature_funcs=new_feature_funcs,
) )
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---") logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -0,0 +1,8 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
autogluon==0.8.2
pyarrow==13.0.0
pre-commit==3.3.3
sphinx==7.2.5
sphinx_rtd_theme==1.3.0

View file

@ -0,0 +1,6 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
autogluon==0.8.2
pyarrow==13.0.0
PyYAML==6.0.1

View file

@ -0,0 +1,8 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
autogluon==0.8.2
pyarrow==13.0.0
pre-commit==3.3.3
sphinx==7.2.5
sphinx_rtd_theme==1.3.0

View file

@ -0,0 +1,3 @@
boto3==1.28.41
pandas==1.5.3
autogluon==0.8.2

View file

@ -0,0 +1,3 @@
dvc==3.18.0
dvc-s3==2.23.0
gto==1.0.4

View file

@ -6,9 +6,9 @@ import shutil
import yaml import yaml
from pathlib import Path from pathlib import Path
from core.Logger import logger from core.Logger import logger
from config import settings
startup_cleanup_params = settings.startup_cleanup startup_cleanup_path = Path(__file__).parent / "configs" / "startup_cleanup.yaml"
startup_cleanup_params = yaml.safe_load(open(startup_cleanup_path))
def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None: def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None:
@ -16,9 +16,13 @@ def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None:
Remove the directory where artefacts are stored Remove the directory where artefacts are stored
""" """
logger.info("---------------------")
logger.info(f"--- Run Clean up ---") logger.info(f"--- Run Clean up ---")
logger.info("---------------------")
logger.info("-------------------------")
logger.info(f"--- Delete artefacts ---") logger.info(f"--- Delete artefacts ---")
logger.info("-------------------------")
artefact_directory_path = Path(artefacts_directory) artefact_directory_path = Path(artefacts_directory)
@ -27,7 +31,9 @@ def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None:
logger.info(f"Removing the directory: {artefacts_directory}") logger.info(f"Removing the directory: {artefacts_directory}")
shutil.rmtree(artefact_directory_path) shutil.rmtree(artefact_directory_path)
logger.info("-----------------------")
logger.info(f"--- Delete metrics ---") logger.info(f"--- Delete metrics ---")
logger.info("-----------------------")
metrics_directory_path = Path(metrics_directory) metrics_directory_path = Path(metrics_directory)
@ -39,11 +45,15 @@ def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None:
if __name__ == "__main__": if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---") logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
run_cleanup( run_cleanup(
artefacts_directory=startup_cleanup_params["artefacts"], artefacts_directory=startup_cleanup_params["artefacts"],
metrics_directory=startup_cleanup_params["metrics"], metrics_directory=startup_cleanup_params["metrics"],
) )
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---") logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")

View file

@ -0,0 +1,3 @@
# Pipeline2
PLACEHOLDER PIPELINE IF NEEDED