Merge pull request #49 from Hestia-Homes/feature/hubspot_to_db

Feature/hubspot to db
This commit is contained in:
Jun-te Kim 2025-05-14 16:12:10 +01:00 committed by GitHub
commit b702404fec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 2136 additions and 861 deletions

View file

@ -0,0 +1,37 @@
name: HubSpot Deals to DB loading and Invoice Calculator
on:
schedule:
- cron: '0 19 * * 0'
workflow_dispatch:
jobs:
sharepoint-validator:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install poetry
poetry install --no-root
- name: run script
run: |
pwd
ls -la
poetry run python etl/hubspot_to_invoice.py
env:
PYTHONPATH: ${{ github.workspace }}
DATABASE_URL: postgresql://postgres:makingwarmhomes@terraform-20250331175522503500000002.cdgzupxvdyp0.eu-west-2.rds.amazonaws.com:5432/surveyDB
SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID: ${{ secrets.SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID }}
JJC_SERVICE_SHAREPOINT_ID: ${{ secrets.JJC_SERVICE_SHAREPOINT_ID }}
BAXTER_KELLY_SERVICE_SHAREPOINT_ID: ${{ secrets.BAXTER_KELLY_SERVICE_SHAREPOINT_ID }}
SGEC_SERVICE_SHAREPOINT_ID: ${{ secrets.SGEC_SERVICE_SHAREPOINT_ID }}
SHAREPOINT_CLIENT_ID: ${{ secrets.SHAREPOINT_CLIENT_ID }}
SHAREPOINT_CLIENT_SECRET: ${{ secrets.SHAREPOINT_CLIENT_SECRET }}
SHAREPOINT_TENANT_ID: ${{ secrets.SHAREPOINT_TENANT_ID }}

View file

@ -2,8 +2,12 @@
"jupyter.interactiveWindow.textEditor.executeSelection": true,
"python.REPL.sendToNativeREPL": true,
"notebook.output.scrolling": true,
"notebook.output.textLineLimit": 0
"terminal.integrated.defaultProfile.linux": "bash",
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash"
}
},
// Hot reload setting that needs to be in user settings
// "jupyter.runStartupCommands": [

View file

@ -1,119 +1,2 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic/
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic//versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic//versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
script_location = alembic

View file

@ -1,81 +1,50 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import create_engine
from sqlalchemy import pool
from alembic import context
from sqlmodel import SQLModel
from etl.load.preSiteNoteTypes import *
from etl.load.topLevel import *
import os
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
# Load DB URL from env var
db_url = os.getenv("DATABASE_URL")
if not db_url:
raise RuntimeError("Please set DATABASE_URL")
if not db_url:
raise RuntimeError("Please specify database url via DATABASE_URL in env variable")
import logging
logging.basicConfig(level=logging.INFO)
config = context.config
config.set_main_option("sqlalchemy.url", db_url)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = SQLModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
url=db_url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
engine = create_engine(db_url, poolclass=pool.NullPool)
with engine.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()

View file

@ -0,0 +1,33 @@
"""Add address in company
Revision ID: 348cc76ccdb4
Revises: 6f76c19a8930
Create Date: 2025-05-14 14:50:09.952275
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
# revision identifiers, used by Alembic.
revision: str = '348cc76ccdb4'
down_revision: Union[str, None] = '6f76c19a8930'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('companyinfo', sa.Column('address', sqlmodel.sql.sqltypes.AutoString(), nullable=False))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('companyinfo', 'address')
# ### end Alembic commands ###

View file

@ -0,0 +1,34 @@
"""mistake on foreign key
Revision ID: 4439bf516ac8
Revises: c6d2f6bf094a
Create Date: 2025-05-12 15:03:09.037008
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4439bf516ac8'
down_revision: Union[str, None] = 'c6d2f6bf094a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('presitenote_pre_site_note_description_id_fkey', 'presitenote', type_='foreignkey')
op.create_foreign_key(None, 'presitenote', 'propertydescription', ['pre_site_note_description_id'], ['id'])
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'presitenote', type_='foreignkey')
op.create_foreign_key('presitenote_pre_site_note_description_id_fkey', 'presitenote', 'assessorinfo', ['pre_site_note_description_id'], ['id'])
# ### end Alembic commands ###

View file

@ -0,0 +1,37 @@
"""Added more data columns for documnets
Revision ID: 6f76c19a8930
Revises: 4439bf516ac8
Create Date: 2025-05-14 14:06:01.697106
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
# revision identifiers, used by Alembic.
revision: str = '6f76c19a8930'
down_revision: Union[str, None] = '4439bf516ac8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('documents', sa.Column('assessor_id', sa.Uuid(), nullable=False))
op.create_foreign_key(None, 'documents', 'assessorinfo', ['assessor_id'], ['id'])
op.drop_column('documents', 'author')
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('documents', sa.Column('author', sa.VARCHAR(), autoincrement=False, nullable=False))
op.drop_constraint(None, 'documents', type_='foreignkey')
op.drop_column('documents', 'assessor_id')
# ### end Alembic commands ###

View file

@ -0,0 +1,328 @@
"""Initial migration
Revision ID: 9f45742b4b2f
Revises:
Create Date: 2025-05-12 13:24:03.856980
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import sqlmodel
# revision identifiers, used by Alembic.
revision: str = '9f45742b4b2f'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('companyinfo',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('trading_name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('post_code', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('fax_number', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('related_party_disclosure', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('door',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('no_of_doors', sa.Integer(), nullable=False),
sa.Column('no_of_insulated_doors', sa.Integer(), nullable=False),
sa.Column('u_value_w_m2_k', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('floors',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('floor_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('ground_floor_construction', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('ground_floor_insulation_type', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('floor_insulation_thickness_mm', sa.Float(), nullable=True),
sa.Column('u_value_known', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('fluegasheatrecoverysystem',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('fghrs_present', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('heatingsystemcontrols',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('control_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('flue_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('fan_assisted_flue', sa.Boolean(), nullable=False),
sa.Column('heat_emitter_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('electricity_meter_type', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('mains_gas_available', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('heatingtype',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('heating_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('fuel_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('hotwatercylinder',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('volume', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation_thickness', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('thermostat', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('insulation',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('lighting',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('total_no_of_light_fittings', sa.Integer(), nullable=False),
sa.Column('total_no_of_lel_fittings', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('otherdetails',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('electricity_meter_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('main_gas_avalible', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('photovoltaicpanel',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('pvs_are_connected_to_dwelling_electricity_meter', sa.Boolean(), nullable=False),
sa.Column('percentage_of_external_roof_area_with_pvs', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('presitenotessummaryinfo',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('reference_number', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('epc_language', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('uprn', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('postcode', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('region', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('address', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('town', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('county', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('property_tenure', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('transaction_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('inspection_date', sa.DateTime(), nullable=False),
sa.Column('current_sap', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('potential_sap', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('current_ei', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('potential_ei', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('current_annual_emissions', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('current_annual_emission_including_0925_multiplayer', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('current_annual_energy_costs', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('roofs',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('construction', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation_thickness', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('u_value_known', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('showerandbaths',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('no_of_rooms_with_baths_and_or_shower', sa.Integer(), nullable=False),
sa.Column('no_of_rooms_with_mixer_shower_and_no_baths', sa.Integer(), nullable=False),
sa.Column('no_of_rooms_with_mixer_shower_and_baths', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('solarwaterheating',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('solar_water_heating_details_known', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('ventilationandcooling',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('no_of_open_fireplaces', sa.Integer(), nullable=False),
sa.Column('ventilation_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('space_cooling_system_present', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('walls',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('construction', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('insulation_thickness_mm', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('wall_thickness_measured', sa.Boolean(), nullable=False),
sa.Column('wall_thickness_mm', sa.Integer(), nullable=True),
sa.Column('u_value_known', sa.Boolean(), nullable=False),
sa.Column('u_value_w_m2_k', sa.Float(), nullable=True),
sa.Column('dry_lining', sa.Boolean(), nullable=False),
sa.Column('alternative_wall_present', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('waterheating',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('heating_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('fuel_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('windturbine',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('wind_turbine', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('assessorinfo',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('accreditation_number', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('phone_number', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('email_address', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('company_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['company_id'], ['companyinfo.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('heating',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('heating_source', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('efficiency_source', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('heating_fuel', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('brand_name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('model_name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('model_qualifer', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('sap_2009_table', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('percentage_of_heated_floor_area_served', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('controls_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['controls_id'], ['heatingsystemcontrols.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('propertydetail',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('age_band', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('wall_id', sa.Uuid(), nullable=True),
sa.Column('roof_id', sa.Uuid(), nullable=True),
sa.Column('floor_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['floor_id'], ['floors.id'], ),
sa.ForeignKeyConstraint(['roof_id'], ['roofs.id'], ),
sa.ForeignKeyConstraint(['wall_id'], ['walls.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('dimension',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('floor_area_m2', sa.Float(), nullable=False),
sa.Column('room_height_m', sa.Float(), nullable=False),
sa.Column('loss_perimeter_m', sa.Float(), nullable=False),
sa.Column('party_wall_length_m', sa.Float(), nullable=False),
sa.Column('property_detail_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['property_detail_id'], ['propertydetail.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('presitenote',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('summary_info_id', sa.Uuid(), nullable=False),
sa.Column('assessor_id', sa.Uuid(), nullable=False),
sa.ForeignKeyConstraint(['assessor_id'], ['assessorinfo.id'], ),
sa.ForeignKeyConstraint(['summary_info_id'], ['presitenotessummaryinfo.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('propertydescription',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('built_form', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('detachment_or_position', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('no_of_main_property', sa.Integer(), nullable=False),
sa.Column('no_of_extension_1', sa.Integer(), nullable=True),
sa.Column('no_of_extension_2', sa.Integer(), nullable=True),
sa.Column('no_of_extension_3', sa.Integer(), nullable=True),
sa.Column('no_of_extension_4', sa.Integer(), nullable=True),
sa.Column('no_of_habitable_rooms', sa.Integer(), nullable=False),
sa.Column('no_of_heated_rooms', sa.Integer(), nullable=False),
sa.Column('heated_basement', sa.Boolean(), nullable=False),
sa.Column('conservatory_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('percentage_of_draught_proofed', sa.Integer(), nullable=False),
sa.Column('terrain_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('conservatory', sa.Boolean(), nullable=False),
sa.Column('main_property_id', sa.Uuid(), nullable=False),
sa.Column('ex1_property_id', sa.Uuid(), nullable=True),
sa.Column('ex2_property_id', sa.Uuid(), nullable=True),
sa.Column('ex3_property_id', sa.Uuid(), nullable=True),
sa.Column('ex4_property_id', sa.Uuid(), nullable=True),
sa.Column('door_id', sa.Uuid(), nullable=True),
sa.Column('ventilation_and_cooling_id', sa.Uuid(), nullable=True),
sa.Column('lighting_id', sa.Uuid(), nullable=True),
sa.Column('water_heating_id', sa.Uuid(), nullable=True),
sa.Column('hot_water_cylinder_id', sa.Uuid(), nullable=True),
sa.Column('solar_water_heating_id', sa.Uuid(), nullable=True),
sa.Column('shower_and_baths_id', sa.Uuid(), nullable=True),
sa.Column('flue_gas_heat_recovery_system_id', sa.Uuid(), nullable=True),
sa.Column('photovoltaic_panel_id', sa.Uuid(), nullable=True),
sa.Column('wind_turbine_id', sa.Uuid(), nullable=True),
sa.Column('other_details_id', sa.Uuid(), nullable=True),
sa.Column('main_heating_id', sa.Uuid(), nullable=True),
sa.Column('main_heating2_id', sa.Uuid(), nullable=True),
sa.Column('secondary_heating_type_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['door_id'], ['door.id'], ),
sa.ForeignKeyConstraint(['ex1_property_id'], ['propertydetail.id'], ),
sa.ForeignKeyConstraint(['ex2_property_id'], ['propertydetail.id'], ),
sa.ForeignKeyConstraint(['ex3_property_id'], ['propertydetail.id'], ),
sa.ForeignKeyConstraint(['ex4_property_id'], ['propertydetail.id'], ),
sa.ForeignKeyConstraint(['flue_gas_heat_recovery_system_id'], ['fluegasheatrecoverysystem.id'], ),
sa.ForeignKeyConstraint(['hot_water_cylinder_id'], ['hotwatercylinder.id'], ),
sa.ForeignKeyConstraint(['lighting_id'], ['lighting.id'], ),
sa.ForeignKeyConstraint(['main_heating2_id'], ['heating.id'], ),
sa.ForeignKeyConstraint(['main_heating_id'], ['heating.id'], ),
sa.ForeignKeyConstraint(['main_property_id'], ['propertydetail.id'], ),
sa.ForeignKeyConstraint(['other_details_id'], ['otherdetails.id'], ),
sa.ForeignKeyConstraint(['photovoltaic_panel_id'], ['photovoltaicpanel.id'], ),
sa.ForeignKeyConstraint(['secondary_heating_type_id'], ['heatingtype.id'], ),
sa.ForeignKeyConstraint(['shower_and_baths_id'], ['showerandbaths.id'], ),
sa.ForeignKeyConstraint(['solar_water_heating_id'], ['solarwaterheating.id'], ),
sa.ForeignKeyConstraint(['ventilation_and_cooling_id'], ['ventilationandcooling.id'], ),
sa.ForeignKeyConstraint(['water_heating_id'], ['waterheating.id'], ),
sa.ForeignKeyConstraint(['wind_turbine_id'], ['windturbine.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('windows',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('glazing_type', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('area_m2', sa.Float(), nullable=False),
sa.Column('roof_window', sa.Boolean(), nullable=False),
sa.Column('orientation', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('u_value_w_m2_k', sa.Integer(), nullable=False),
sa.Column('g_value', sa.Integer(), nullable=False),
sa.Column('property_detail_id', sa.Uuid(), nullable=True),
sa.ForeignKeyConstraint(['property_detail_id'], ['propertydetail.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('windows')
op.drop_table('propertydescription')
op.drop_table('presitenote')
op.drop_table('dimension')
op.drop_table('propertydetail')
op.drop_table('heating')
op.drop_table('assessorinfo')
op.drop_table('windturbine')
op.drop_table('waterheating')
op.drop_table('walls')
op.drop_table('ventilationandcooling')
op.drop_table('solarwaterheating')
op.drop_table('showerandbaths')
op.drop_table('roofs')
op.drop_table('presitenotessummaryinfo')
op.drop_table('photovoltaicpanel')
op.drop_table('otherdetails')
op.drop_table('lighting')
op.drop_table('insulation')
op.drop_table('hotwatercylinder')
op.drop_table('heatingtype')
op.drop_table('heatingsystemcontrols')
op.drop_table('fluegasheatrecoverysystem')
op.drop_table('floors')
op.drop_table('door')
op.drop_table('companyinfo')
# ### end Alembic commands ###

View file

@ -1,37 +0,0 @@
"""initla db
Revision ID: b650a366b88d
Revises:
Create Date: 2025-03-28 10:36:15.235350
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b650a366b88d'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
pass
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('assessorinfo',
sa.Column('id', sa.UUID(), autoincrement=False, nullable=False),
sa.Column('accreditation_number', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('name', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('phone_number', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column('email_address', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.PrimaryKeyConstraint('id', name='assessorinfo_pkey')
)
# ### end Alembic commands ###

View file

@ -0,0 +1,34 @@
"""make it nullable
Revision ID: c6d2f6bf094a
Revises: 9f45742b4b2f
Create Date: 2025-05-12 14:55:43.208954
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'c6d2f6bf094a'
down_revision: Union[str, None] = '9f45742b4b2f'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('presitenote', sa.Column('pre_site_note_description_id', sa.Uuid(), nullable=True))
op.create_foreign_key(None, 'presitenote', 'assessorinfo', ['pre_site_note_description_id'], ['id'])
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'presitenote', type_='foreignkey')
op.drop_column('presitenote', 'pre_site_note_description_id')
# ### end Alembic commands ###

View file

@ -7,9 +7,9 @@ os.environ["JJC_SERVICE_SHAREPOINT_ID"] = "7fdd0485-bbf3-4b29-b30f-98c81c2a6284"
from etl.scraper.scraper import SharePointScraper, SharePointInstaller, WEEK_COMMENCING
import pandas as pd
from etl.surveyedData.surveryedData import surveyedDataProcessor
import etl.scraper.scraper as scraper_module
def return_pandas_from_scraping(week_commencing, installer):
scraper_module.WEEK_COMMENCING = week_commencing
sp = SharePointScraper(installer)

118
etl/db/hubSpotLoad.py Normal file
View file

@ -0,0 +1,118 @@
from etl.hubSpotClient.hubspot import HubSpotClient, DealStage
from etl.surveyPrice.surveyPrice import SurveyPrice
from etl.surveyedData.surveryedData import surveyedDataProcessor
from etl.scraper.scraper import SharePointScraper, SharePointInstaller
from etl.db.db import get_db_session, init_db
from urllib.parse import unquote
class HubspotTodb():
def __init__(self):
init_db()
self.hubspot = HubSpotClient()
self.deals_in_hubspot = None
self.data_in_sharepoint = []
def get_all_deals(self):
sp = SurveyPrice()
self.deals_in_hubspot = sp.get_all_surveys_from_hubspot()
return self.deals_in_hubspot
def get_sharepoint_path(self, url):
url_parts = url.split('/')
# Find the index of 'Forms'
forms_index = url_parts.index('Forms')
# Get the part after 'Forms'
after_forms = url_parts[forms_index + 1]
# Find 'id=' and extract after it
if 'id=' in after_forms:
id_part = after_forms.split('id=')[1]
# Only keep the path before '&' (to ignore other parameters)
id_path = id_part.split('&')[0]
# Decode the path
decoded_path = unquote(id_path)
# Now, remove the leading '/sites/xxx/Shared Documents/' part
parts = decoded_path.split('Shared Documents')
if len(parts) > 1:
final_path = parts[1].strip('/')
return final_path
else:
return decoded_path.strip('/')
def get_sharepoint_scraper(self, installer):
sp = None
if installer.upper() == "J & J CRUMP":
sp = SharePointScraper(SharePointInstaller.JJC)
elif installer.upper() == "SCIS":
sp = SharePointScraper(SharePointInstaller.SOUTH_COAST_INSULATION)
else:
sp = None
return sp
def create_files_locally(self, sp, path, address):
address_paths = {}
file_names_to_download = {}
avoid = [".jpg",".mov", ".JPG", ".heic", ".HEIC", ".png", ".PNG", ".jpeg", ".JPEG", ".mov", ".MOV", ".mp4", ".MP4"]
microsoft_graph_data = sp.get_folders_in_path(path)
for file in microsoft_graph_data['value']:
if 'file' in file:
if any(file["name"].endswith(ext) for ext in avoid):
continue
file_names_to_download.update({file["name"]: file['@microsoft.graph.downloadUrl']})
each_file = []
for file_name, url in file_names_to_download.items():
content = sp.get_file_content(url)
file_path = sp.create_temp_file(content, f"{address}/{file_name}")
each_file.append(file_path)
address_paths.update({address: each_file})
return address_paths
def gather_data_from_each_sharepoint(self):
self.get_all_deals()
for _, row in self.deals_in_hubspot.iterrows():
sp = self.get_sharepoint_scraper(row["HUBSPOT_INSTALLER"])
path = self.get_sharepoint_path(row["HUBSPOT_SHAREPOINT_PATH"])
data_loc = self.create_files_locally(sp, path, row["HUBSPOT_DEAL_ADDRESS"])
for add, file_loc in data_loc.items():
sdp = surveyedDataProcessor(add, file_loc)
sdp.hubspot_deal_id = row["HUBSPOT_DEAL_ID"]
self.data_in_sharepoint.append(sdp)
def load_all(self, fast=False):
if fast is False:
self.gather_data_from_each_sharepoint()
with get_db_session() as session:
self.load_pre_site_note(session)
session.commit()
def load_pre_site_note(self, db_session):
for surveyedData in self.data_in_sharepoint:
# Loads Assessor information and Company information to db
assessor = surveyedData.load_assessor_table(db_session)
# Loads the pre site summary information
summary_info = surveyedData.load_pre_site_notes_summary_table(db_session)
property_description = surveyedData.load_property_description(db_session)
# Creates the a final pre site note table that links all information
presitenote = surveyedData.create_pre_site_note_table(db_session, assessor, summary_info, property_description)
df = self.deals_in_hubspot
df = df[df["HUBSPOT_DEAL_ID"] == str(surveyedData.hubspot_deal_id)]
building_table = surveyedData.create_buildings_table(
db_session,
df["HUBSPOT_LANDLORD_ID"].values[0],
df["HUBSPOT_DOMNA_ID"].values[0],
)
documents = surveyedData.create_document_table_via_pre_site_note(db_session, presitenote, assessor, building_table)
# Create building table or find building table to add new pre_site_note

View file

@ -35,7 +35,45 @@ class HubSpotClient():
return deal.properties.get("dealname", "No deal name")
except Exception as e:
return "Unknown Deal" # Fallback if the deal name is not found
def get_listings_from_deals_id(self, deals_id):
from hubspot.crm.objects import PublicObjectSearchRequest
found_notes = []
after = None
while True:
# Correct filter for notes associated with the given deal ID
search_request = PublicObjectSearchRequest(
filter_groups=[{
"filters": [{
"propertyName": "associations.deal", # Filter by association to the deal
"operator": "EQ",
"value": deals_id,
}]
}],
properties=["domna_property_id", "owner_property_id", 'national_uprn'], # Properties of the note you need
limit=200,
after=after,
)
# Call the search API
response = self.client.crm.objects.search_api.do_search(object_type="0-420", public_object_search_request=search_request)
time.sleep(1)
# Add the results to the found_notes list
found_notes.extend(response.results)
# Handle pagination if more results are available
if not response.paging or not response.paging.next:
break
after = response.paging.next.after
if found_notes:
return found_notes[0]
return None
def get_domna_and_landlord_id(self, deals_id):
data = self.get_listings_from_deals_id(deals_id)
return data.properties['domna_property_id'], data.properties['owner_property_id'], data.properties['national_uprn']
def get_notes_from_deals_id(self, deals_id):
from hubspot.crm.objects import PublicObjectSearchRequest
found_notes = []
@ -115,6 +153,17 @@ class HubSpotClient():
"deal_owner": deal.properties.get("hubspot_owner_id"),
})
return all_deals
def get_associations_for_deal(self, deal_id, to_object_type):
"""
Returns a list of associated object IDs of type `to_object_type`
(e.g. "contacts", "companies", "notes", etc.)
"""
assoc_resp = self.client.crm.deals.associations_api.get_all(
deal_id=deal_id,
to_object_type=to_object_type
)
return [assoc.id for assoc in assoc_resp.results]
def get_deals_from_deal_stage(self, deal_stage: DealStage):
found_deals = []
@ -136,6 +185,7 @@ class HubSpotClient():
"domna_survey_post_sap",
"existing_wall_insulation",
"installer",
"submission_folder",
],
limit=200,
after=after,
@ -148,6 +198,7 @@ class HubSpotClient():
all_deals = []
for deal in found_deals:
domna_id, landlord_id, uprn = self.get_domna_and_landlord_id(deal.id)
all_deals.append(SubmissionInfoFromDeal(
deal_id= deal.properties["hs_object_id"],
deal_name=deal.properties["dealname"],
@ -157,7 +208,12 @@ class HubSpotClient():
existing_wall_insulation=deal.properties.get("existing_wall_insulation") if deal.properties.get("existing_wall_insulation") else "None",
no_of_wet_rooms=int(deal.properties["number_of_wet_rooms_needing_ventilation"]),
installer=deal.properties["installer"],
submission_folder_path = deal.properties["submission_folder"],
landlord_id = landlord_id,
domna_id = domna_id,
uprn = uprn,
))
return all_deals
def print_all_pipeline_ids(self):

View file

@ -18,4 +18,8 @@ class SubmissionInfoFromDeal(BaseModel):
post_sap_score: int
existing_wall_insulation: str
no_of_wet_rooms: int
installer: str
installer: str
submission_folder_path: str
landlord_id: str
domna_id: str
uprn: str

13
etl/hubspot_to_db.py Normal file
View file

@ -0,0 +1,13 @@
import os
os.environ["SHAREPOINT_CLIENT_ID"] = "895e3b77-b1d7-43ec-b18f-dcfe07cdfeaf"
os.environ["SHAREPOINT_CLIENT_SECRET"] = "SOf8Q~-is4wdQiqvEEm9FlJQRAY9ELGaj5Qz-a6E"
os.environ["SHAREPOINT_TENANT_ID"] = "c3f7519c-2719-4547-af04-6da6cbfd8f8f"
os.environ["SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID"] = "b5a51507-9427-4ee0-b03e-90ec7681e2d3"
os.environ["JJC_SERVICE_SHAREPOINT_ID"] = "7fdd0485-bbf3-4b29-b30f-98c81c2a6284"
from etl.db.hubSpotLoad import HubspotTodb
dbLoader = HubspotTodb()
dbLoader.load_all()

View file

@ -6,8 +6,13 @@ os.environ["SHAREPOINT_CLIENT_SECRET"] = "SOf8Q~-is4wdQiqvEEm9FlJQRAY9ELGaj5Qz-a
os.environ["SHAREPOINT_TENANT_ID"] = "c3f7519c-2719-4547-af04-6da6cbfd8f8f"
os.environ["SOUTH_COAST_INSULATION_SERVICE_SHAREPOINT_ID"] = "b5a51507-9427-4ee0-b03e-90ec7681e2d3"
os.environ["JJC_SERVICE_SHAREPOINT_ID"] = "7fdd0485-bbf3-4b29-b30f-98c81c2a6284"
# Local development
# os.environ["DATABASE_URL"] = "postgresql://postgres:makingwarmhomes@db:5432/postgres"
from etl.surveyPrice.surveyPrice import SurveyPrice
from etl.db.hubSpotLoad import HubspotTodb
sp = SurveyPrice()
@ -31,48 +36,10 @@ sp.upload_to_sharepoint(sp.get_master_rate_card_path(), "COPY_OF_RATE_CARD_USED.
deal_ids = df["HUBSPOT_DEAL_ID"].tolist()
# Load to db
dbLoader = HubspotTodb()
dbLoader.load_all()
# Commented out as i don't want to sync up hubspot_to_db just yet
sp.move_deals_to_completed(deal_ids)
"""
TODO:
Tuesday
P1) - Get read for demo, 3 examples of solar ( JJC AND SCIS), 3 examples of cavity wall ( SCIS and JJC) 12 in total
P2) Review deem score with last weeks deem score values to ensure accuracy
P3) Figure out what to do if I see an address that isn't registered but surveyrod
P3) Write documentation for tech demos from Khalims demo - Handed off to cyrus
"""
# Look for
# JJC
# 3 examples of Solar
# No solar example in april deem scroe
# 3 examples Cavity Wall, FOAM, Empty and General ideally
# (in hubspot )111 Duddell Road General ( fibre) - 500, 2 wet rooms
# Empty
# ( in hubspot ) 29 Lower King ( empty ) - 500 - 400
# Foam
# ( in hubspot ) 6 STOKESAY STREET (foam) - 400 - 200
# SCIS
# 3 examples of Solar
# ( in hubspot ) 12 short hedges - Solar 1608
# ( in hubspot ) 18 short hedge - Solar 1608
# ( in hubspot) 6 forety road -Solar 1608
# 3 examples Cavity Wall, FOAM, Empty and General ideally
# ( in hubspot ) 319 Muirfield Road, (Empty Cavity) - 1000
# ( hubspot ) 2 queensway, (Fibre) - 500
# ( in hubspot )56 Aughton Crescent -(foam) - To be worked out by Lewis but lets use this as an oppurtunity -
# Compare value with what I should get and in the deem score. Keep tabs below so I can check easily
# Change w.c. date to a weird one to speed up automation
# Observation:
"""
2 queensway is wrong due the fact that csr and empty cavity but deem score says cavity
"""

View file

@ -0,0 +1,336 @@
from sqlmodel import Field, SQLModel, Relationship
import uuid
from typing import Optional, List
from datetime import datetime
from pydantic import EmailStr
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from etl.load.topLevel import BaseModel, Documents
class PreSiteNote(BaseModel, table=True):
summary_info_id: uuid.UUID = Field(
foreign_key="presitenotessummaryinfo.id",
nullable=False
)
summary_info: Optional["PreSiteNotesSummaryInfo"] = Relationship(back_populates="pre_site_notes")
# Assessor Info
assessor_id: uuid.UUID = Field(
foreign_key="assessorinfo.id",
nullable=False
)
assessor: Optional["AssessorInfo"] = Relationship(back_populates="pre_site_notes")
pre_site_note_description_id: uuid.UUID = Field(
foreign_key="propertydescription.id",
nullable=True
)
pre_site_note_description: Optional["PropertyDescription"] = Relationship(back_populates="pre_site_notes")
class Dimension(BaseModel, table=True):
floor_area_m2: float
room_height_m: float
loss_perimeter_m: float
party_wall_length_m: float
property_detail_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
property_detail: Optional["PropertyDetail"] = Relationship(back_populates="dimensions")
class Walls(BaseModel, table=True):
construction: str
insulation: str
insulation_thickness_mm: str
wall_thickness_measured: bool
wall_thickness_mm: Optional[int]
u_value_known: bool
u_value_w_m2_k: Optional[float]
dry_lining: bool
alternative_wall_present: bool
class Roofs(BaseModel, table=True):
construction: str
insulation_type: str
insulation_thickness: str
u_value_known: bool
class Floors(BaseModel, table=True):
floor_type: str
ground_floor_construction: str
ground_floor_insulation_type: Optional[str] = ""
floor_insulation_thickness_mm: Optional[float] = -1
u_value_known: bool
class Windows(BaseModel, table=True):
glazing_type: str
area_m2: float
roof_window: bool
orientation: str
u_value_w_m2_k: int
g_value: int
property_detail_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
property_detail: Optional["PropertyDetail"] = Relationship(back_populates="windows")
class PropertyDetail(BaseModel, table=True):
age_band: str
wall_id: Optional[uuid.UUID] = Field(default=None, foreign_key="walls.id")
roof_id: Optional[uuid.UUID] = Field(default=None, foreign_key="roofs.id")
floor_id: Optional[uuid.UUID] = Field(default=None, foreign_key="floors.id")
# Relationships
dimensions: List[Dimension] = Relationship(back_populates="property_detail")
windows: List[Windows] = Relationship(back_populates="property_detail")
class Door(BaseModel, table=True):
no_of_doors: int
no_of_insulated_doors: int
u_value_w_m2_k: Optional[str]
property_description: Optional["PropertyDescription"] = Relationship(back_populates="door")
class VentilationAndCooling(BaseModel, table=True):
no_of_open_fireplaces: int
ventilation_type: str
space_cooling_system_present: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="ventilation_and_cooling")
class Lighting(BaseModel, table=True):
total_no_of_light_fittings: int
total_no_of_lel_fittings: int
property_description: Optional["PropertyDescription"] = Relationship(back_populates="lighting")
class HeatingSystemControls(BaseModel, table=True):
control_type: str
flue_type: str
fan_assisted_flue: bool
heat_emitter_type: str
electricity_meter_type: Optional[str] = ""
mains_gas_available: Optional[bool] = False
class Heating(BaseModel, table=True):
type: str
heating_source: str
efficiency_source: str
heating_fuel: str
brand_name: str
model_name: str
model_qualifer: str
sap_2009_table: Optional[str] = ""
percentage_of_heated_floor_area_served: Optional[str] = ""
controls_id: Optional[uuid.UUID] = Field(default=None, foreign_key="heatingsystemcontrols.id")
property_description: Optional["PropertyDescription"] = Relationship(
back_populates="main_heating", sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.main_heating_id]"}
)
property_description2: Optional["PropertyDescription"] = Relationship(
back_populates="main_heating2", sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.main_heating2_id]"}
)
class HeatingType(BaseModel, table=True):
heating_type: str
fuel_type: str
property_description: Optional["PropertyDescription"] = Relationship(back_populates="secondary_heating_type")
class WaterHeating(BaseModel, table=True):
heating_type: str
fuel_type: str
property_description: Optional["PropertyDescription"] = Relationship(back_populates="water_heating")
class HotWaterCylinder(BaseModel, table=True):
volume: str
insulation_type: str
insulation_thickness: str
thermostat: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="hot_water_cylinder")
class SolarWaterHeating(BaseModel, table=True):
solar_water_heating_details_known: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="solar_water_heating")
class ShowerAndBaths(BaseModel, table=True):
no_of_rooms_with_baths_and_or_shower: int
no_of_rooms_with_mixer_shower_and_no_baths: int
no_of_rooms_with_mixer_shower_and_baths: int
property_description: Optional["PropertyDescription"] = Relationship(back_populates="shower_and_baths")
class FlueGasHeatRecoverySystem(BaseModel, table=True):
fghrs_present: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="flue_gas_heat_recovery_system")
class PhotovoltaicPanel(BaseModel, table=True):
pvs_are_connected_to_dwelling_electricity_meter: bool
percentage_of_external_roof_area_with_pvs: str
property_description: Optional["PropertyDescription"] = Relationship(back_populates="photovoltaic_panel")
class WindTurbine(BaseModel, table=True):
wind_turbine: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="wind_turbine")
class OtherDetails(BaseModel, table=True):
electricity_meter_type: str
main_gas_avalible: bool
property_description: Optional["PropertyDescription"] = Relationship(back_populates="other_details")
class PropertyDescription(BaseModel, table=True):
built_form: str
detachment_or_position: str
no_of_main_property: int
no_of_extension_1: Optional[int] = 0
no_of_extension_2: Optional[int] = 0
no_of_extension_3: Optional[int] = 0
no_of_extension_4: Optional[int] = 0
no_of_habitable_rooms: int
no_of_heated_rooms: int
heated_basement: bool
conservatory_type: str
percentage_of_draught_proofed: int
terrain_type: str
conservatory: bool
main_property_id: uuid.UUID = Field(foreign_key="propertydetail.id")
ex1_property_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
ex2_property_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
ex3_property_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
ex4_property_id: Optional[uuid.UUID] = Field(default=None, foreign_key="propertydetail.id")
door_id: Optional[uuid.UUID] = Field(default=None, foreign_key="door.id")
ventilation_and_cooling_id: Optional[uuid.UUID] = Field(default=None, foreign_key="ventilationandcooling.id")
lighting_id: Optional[uuid.UUID] = Field(default=None, foreign_key="lighting.id")
water_heating_id: Optional[uuid.UUID] = Field(default=None, foreign_key="waterheating.id")
hot_water_cylinder_id: Optional[uuid.UUID] = Field(default=None, foreign_key="hotwatercylinder.id")
solar_water_heating_id: Optional[uuid.UUID] = Field(default=None, foreign_key="solarwaterheating.id")
shower_and_baths_id: Optional[uuid.UUID] = Field(default=None, foreign_key="showerandbaths.id")
flue_gas_heat_recovery_system_id: Optional[uuid.UUID] = Field(default=None, foreign_key="fluegasheatrecoverysystem.id")
photovoltaic_panel_id: Optional[uuid.UUID] = Field(default=None, foreign_key="photovoltaicpanel.id")
wind_turbine_id: Optional[uuid.UUID] = Field(default=None, foreign_key="windturbine.id")
other_details_id: Optional[uuid.UUID] = Field(default=None, foreign_key="otherdetails.id")
main_heating_id: Optional[uuid.UUID] = Field(default=None, foreign_key="heating.id")
main_heating2_id: Optional[uuid.UUID] = Field(default=None, foreign_key="heating.id")
secondary_heating_type_id: Optional[uuid.UUID] = Field(default=None, foreign_key="heatingtype.id")
# Relationships
main_property: Optional["PropertyDetail"] = Relationship(sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.main_property_id]"})
ex1_property: Optional["PropertyDetail"] = Relationship(sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.ex1_property_id]"})
ex2_property: Optional["PropertyDetail"] = Relationship(sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.ex2_property_id]"})
ex3_property: Optional["PropertyDetail"] = Relationship(sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.ex3_property_id]"})
ex4_property: Optional["PropertyDetail"] = Relationship(sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.ex4_property_id]"})
# Related Models
door: Optional["Door"] = Relationship(back_populates="property_description")
ventilation_and_cooling: Optional["VentilationAndCooling"] = Relationship(back_populates="property_description")
lighting: Optional["Lighting"] = Relationship(back_populates="property_description")
water_heating: Optional["WaterHeating"] = Relationship(back_populates="property_description")
hot_water_cylinder: Optional["HotWaterCylinder"] = Relationship(back_populates="property_description")
solar_water_heating: Optional["SolarWaterHeating"] = Relationship(back_populates="property_description")
shower_and_baths: Optional["ShowerAndBaths"] = Relationship(back_populates="property_description")
flue_gas_heat_recovery_system: Optional["FlueGasHeatRecoverySystem"] = Relationship(back_populates="property_description")
photovoltaic_panel: Optional["PhotovoltaicPanel"] = Relationship(back_populates="property_description")
wind_turbine: Optional["WindTurbine"] = Relationship(back_populates="property_description")
other_details: Optional["OtherDetails"] = Relationship(back_populates="property_description")
main_heating: Optional["Heating"] = Relationship(back_populates="property_description", sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.main_heating_id]"})
main_heating2: Optional["Heating"] = Relationship(back_populates="property_description", sa_relationship_kwargs={"foreign_keys": "[PropertyDescription.main_heating2_id]"})
secondary_heating_type: Optional["HeatingType"] = Relationship(back_populates="property_description")
pre_site_notes: Optional["PreSiteNote"] = Relationship(back_populates="pre_site_note_description")
class AssessorInfo(BaseModel, table=True):
accreditation_number: str
name: str
phone_number: Optional[str] = None
email_address: Optional[EmailStr] = None
company_id: Optional[uuid.UUID] = Field(default=None, foreign_key="companyinfo.id")
company: Optional["CompanyInfo"] = Relationship(back_populates="assessors")
pre_site_notes: List["PreSiteNote"] = Relationship(back_populates="assessor")
documents: List["Documents"] = Relationship(back_populates="author")
class PreSiteNotesSummaryInfo(BaseModel, table=True):
reference_number: str
epc_language: str
uprn: Optional[str] = ""
postcode: str
region: str
address: str
town: str
county: Optional[str] = ""
property_tenure: str
transaction_type: str
inspection_date: datetime
current_sap: str
potential_sap: str
current_ei: str
potential_ei: str
current_annual_emissions: str
current_annual_emission_including_0925_multiplayer: str
current_annual_energy_costs: str
pre_site_notes: List["PreSiteNote"] = Relationship(back_populates="summary_info")
class CompanyInfo(BaseModel, table=True):
address: str
trading_name: str
post_code: str
fax_number: Optional[str] = None
related_party_disclosure: Optional[str] = None
assessors: List[AssessorInfo] = Relationship(back_populates="company")
class Insulation(BaseModel, table=True):
type: str
PreSiteNote.update_forward_refs()
AssessorInfo.update_forward_refs()

37
etl/load/topLevel.py Normal file
View file

@ -0,0 +1,37 @@
from sqlmodel import Field, SQLModel, Relationship
import uuid
from typing import Optional, List
from datetime import datetime
from pydantic import EmailStr
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from etl.pdfReader.reportType import ReportType
class BaseModel(SQLModel):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
class Buildings(BaseModel, table=True):
address: str
postcode: str
UPRN: str
landlord_id: str
domna_id: str
documents: List["Documents"] = Relationship(back_populates="building")
class Documents(BaseModel, table=True):
assessor_id: uuid.UUID = Field(
foreign_key="assessorinfo.id",
nullable=False
)
author: Optional["AssessorInfo"] = Relationship(back_populates="documents")
created_at: datetime
document_type: ReportType
building_id: uuid.UUID = Field(foreign_key="buildings.id", nullable=False)
building: Optional["Buildings"] = Relationship(back_populates="documents")
target_table: str
target_id: uuid.UUID
Documents.update_forward_refs()

Binary file not shown.

View file

@ -1,10 +1,13 @@
from monday import MondayClient
import json
import requests
board_id = "8829428746"
import time
from tqdm import tqdm
board_id = "3584401309"
monday_key = "eyJhbGciOiJIUzI1NiJ9.eyJ0aWQiOjQ5ODc2ODQxOCwiYWFpIjoxMSwidWlkIjozNjE3ODAzNCwiaWFkIjoiMjAyNS0wNC0xMVQxMToyMzoxNy40NjdaIiwicGVyIjoibWU6d3JpdGUiLCJhY3RpZCI6MTM5OTc4MjMsInJnbiI6InVzZTEifQ.-2Lit4s46ZF6AXuMW9t0TxIaFLkHqD4Yo-PyM9i2XZY"
monday = MondayClient(monday_key)
import time
def get_all_items(board_id, monday):
# Parameters
@ -75,7 +78,7 @@ if not postcode_col_id or not location_col_id:
raise Exception("Could not find 'postcode' or 'location' columns")
items = get_all_items(board_id, monday)
for item in items:
for item in tqdm(items):
item_name = item["name"]
item_id = item["id"]

View file

@ -7,46 +7,49 @@ os.environ["SHAREPOINT_TENANT_ID"] = "10d5af8b-2cfd-4882-9ccd-b96e4812dacf"
from etl.scraper.scraper import SharePointInstaller
from etl.scraper.scraper import SharePointScraper
import pandas as pd
from tqdm import tqdm
osmosis = SharePointScraper(SharePointInstaller.OSMOSIS)
parent_folder = "Automated Example"
osmosis.create_dir(parent_folder, "/JTK Test Folder")
osmosis = SharePointScraper(SharePointInstaller.OSMOSIS_WAVE_2)
asset_list = pd.read_excel("osmosis_data/asset_list.xlsx", sheet_name="2502 accent housing")
parent_folder = "/Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders/12. Decent Homes"
asset_list = pd.read_excel("osmosis_data/asset_list.xlsx", sheet_name="Sheet1")
new_asset_list = []
parent_folder = "JTK Test Folder/Automated Example"
# Create asset list and location
for index, address in asset_list.iterrows():
webUrl = osmosis.create_dir(address['Name'], parent_folder)
for index, address in tqdm(asset_list.iterrows()):
folder_name = address['Name'] + " " + address['Postcode']
webUrl = osmosis.create_dir(folder_name, parent_folder)
first_folder = "1. Retrofit Assessment"
osmosis.create_dir(first_folder, parent_folder + f"/{address['Name']}")
osmosis.create_dir("A. Assessment", parent_folder + f"/{address['Name']}/{first_folder}")
osmosis.create_dir("B. Air Tightness Tests", parent_folder + f"/{address['Name']}/{first_folder}")
osmosis.create_dir(first_folder, parent_folder + f"/{folder_name}")
osmosis.create_dir("A. Assessment", parent_folder + f"/{folder_name}/{first_folder}")
osmosis.create_dir("B. Air Tightness Tests", parent_folder + f"/{folder_name}/{first_folder}")
second_folder = "2. RC Mid-Term Plan"
osmosis.create_dir(second_folder, parent_folder + f"/{address['Name']}")
osmosis.create_dir("SAP", parent_folder + f"/{address['Name']}/{second_folder}")
osmosis.create_dir(second_folder, parent_folder + f"/{folder_name}")
osmosis.create_dir("SAP", parent_folder + f"/{folder_name}/{second_folder}")
third_folder = "3. Retrofit Design"
osmosis.create_dir(third_folder, parent_folder + f"/{address['Name']}")
osmosis.create_dir(third_folder, parent_folder + f"/{folder_name}")
fourth_folder = "4. Post EPC"
osmosis.create_dir(fourth_folder, parent_folder + f"/{address['Name']}")
osmosis.create_dir(f"{address['Name']} - POST EPC Photos", parent_folder + f"/{address['Name']}/{fourth_folder}")
osmosis.create_dir(fourth_folder, parent_folder + f"/{folder_name}")
osmosis.create_dir(f"{address['Name']} - POST EPC Photos", parent_folder + f"/{folder_name}/{fourth_folder}")
fifth_folder = "5. Trustmark Lodgement"
osmosis.create_dir(fifth_folder, parent_folder + f"/{address['Name']}")
osmosis.create_dir("1. Works", parent_folder + f"/{address['Name']}/{fifth_folder}")
osmosis.create_dir(fifth_folder, parent_folder + f"/{folder_name}")
osmosis.create_dir("1. Works", parent_folder + f"/{folder_name}/{fifth_folder}")
osmosis.create_dir("2. Required Documents", parent_folder + f"/{address['Name']}/{fifth_folder}")
osmosis.create_dir("3. Additional Documents", parent_folder + f"/{address['Name']}/{fifth_folder}")
osmosis.create_dir("2. Required Documents", parent_folder + f"/{folder_name}/{fifth_folder}")
osmosis.create_dir("3. Additional Documents", parent_folder + f"/{folder_name}/{fifth_folder}")
asset_data = {
"Name": address['Name'],
"Postcode": address['Postcode'],
"Sharepoint": webUrl,
}

View file

@ -2,8 +2,8 @@ from enum import Enum
class ReportType(Enum):
QUIDOS_PRESITE_NOTE = 1
CHARTED_SURVEYOR_REPORT = 2
ENERGY_PERFORMANCE_REPORT = 3
U_VALUE_CALCULATOR_REPORT = 4
OVERWRITING_U_VALUE_DECLARATION_FORM = 5
QUIDOS_PRESITE_NOTE = "quidos_presite_note"
CHARTED_SURVEYOR_REPORT = "charted_surveyor_report"
ENERGY_PERFORMANCE_REPORT = "energy_performance_report"
U_VALUE_CALCULATOR_REPORT = "u_value_calculator_report"
OVERWRITING_U_VALUE_DECLARATION_FORM = "overwriting_u_value_declaration_form"

View file

@ -1,6 +1,6 @@
from etl.pdfReader.reportType import ReportType
from etl.transform.types import (
CompanyInfo, SurverySummaryInfo, AssessorInfo,
from etl.transform.preSiteNoteTypes import (
CompanyInfo, PreSiteNotesSummaryInfo, AssessorInfo,
PropertyDescription, PropertyDetail, Dimension,
Walls, Roofs, Floors, Door, VentilationAndCooling,
Lighting, WaterHeating, HotWaterCylinder, SolarWaterHeating,
@ -59,8 +59,6 @@ class CSR(SiteNotesExtractor):
type=dict_.get('detailed_description_of_existing_cavity_wall_insulation_', "")
) if dict_ is not None else None
@ -124,7 +122,7 @@ class QuidosSiteNotesExtractor(SiteNotesExtractor):
else:
including_9_92_emission_factor = None
self.survey_information = SurverySummaryInfo(
self.survey_information = PreSiteNotesSummaryInfo(
reference_number = get_value('Reference Number'),
epc_language = get_value('EPC Language'),
uprn = get_value('UPRN'),
@ -147,6 +145,7 @@ class QuidosSiteNotesExtractor(SiteNotesExtractor):
self.company_information = CompanyInfo(
address=self.raw_data[self.get_x_occurance(self.raw_data,'Address', 1) + 1],
trading_name = get_value('Company name/trading name'),
post_code = get_value('POST CODE'),
fax_number = get_value('Fax number'),

View file

@ -8,15 +8,14 @@ from etl.utils.sharepoint.sharepoint import SharePointClient
from functools import wraps
import re
from etl.validator.validator import DomnaSharePointValidator
from tqdm import tqdm
from datetime import datetime, timedelta
def previous_monday():
today = datetime.today()
last_monday = today - timedelta(days=today.weekday() + 7) # Go back to last week's Monday
return f"W.C. 31.09.2000"
# return f"W.C. {last_monday.strftime('%d.%m.%Y')}"
# return f"W.C. 31.09.2000"
return f"W.C. {last_monday.strftime('%d.%m.%Y')}"
WEEK_COMMENCING = os.getenv("WEEK_COMMENCING", previous_monday())
@ -27,7 +26,8 @@ class SharePointInstaller(Enum):
SGEC = os.getenv("SGEC_SERVICE_SHAREPOINT_ID", None)
BAXTER_KELLY = os.getenv("BAXTER_KELLY_SERVICE_SHAREPOINT_ID", "6f930bf3-572d-4f91-b1ae-ec536fa319e2")
DOMNA = os.getenv("DOMNA_SHAREPOINT_ID", "8ab64924-ccde-4b56-b0dc-4e11596446e4")
OSMOSIS = os.getenv("OSMOSIS_SHAREPOINT_ID", "350a3b48-8311-4506-8abb-69bafc280d6f")
OSMOSIS_WAVE_3 = os.getenv("OSMOSIS_SHAREPOINT_ID", "350a3b48-8311-4506-8abb-69bafc280d6f")
OSMOSIS_WAVE_2 = os.getenv("OSMOSIS_SHAREPOINT_ID", "bc925a9a-ad0b-4de9-9a3c-e61014cc7489")
WARMFRONT = os.getenv("WARMFRONT_SHARPOINT_ID", "bea71c30-d366-454c-a484-ae4d6fd95bc4")
class SharePointScraper():
@ -154,7 +154,7 @@ class SharePointScraper():
@ensure_surveyor_names_loaded
def get_date_folder_names(self):
for name in tqdm(self.surveyor_names):
for name in self.surveyor_names:
dates_folders = self.get_folders_in_path(f"/{name}")
if 'value' not in dates_folders:
raise RuntimeError(f"Failed to get dates folder from {name} in {self.sharepoint_drive.name}")
@ -234,7 +234,7 @@ class SharePointScraper():
@ensure_housing_assosiation_is_loaded
def get_number_of_surverys_completed(self):
for name in tqdm(self.surveyor_names):
for name in self.surveyor_names:
if name in self.surveyor_to_housing_assosications:
for house_ass in self.surveyor_to_housing_assosications[name]:
address_folders = self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_ass}")
@ -273,7 +273,7 @@ class SharePointScraper():
@ensure_housing_assosiation_is_loaded
def download_file_for_each_address(self):
paths = []
for name in tqdm(self.surveyor_names):
for name in self.surveyor_names:
if WEEK_COMMENCING in self.surveyor_to_dates_folder[name]:
for house_ass in self.surveyor_to_housing_assosications[name]:
address_files = self.get_folders_in_path(f"/{name}/{WEEK_COMMENCING}/{house_ass}")

View file

@ -2,7 +2,7 @@
import os
from etl.surveyedData.surveryedData import surveyedDataProcessor
from etl.db.db import get_db_session, init_db
from etl.transform.types import AssessorInfo
from etl.transform.preSiteNoteTypes import AssessorInfo
pre_site_note_path = os.path.join(os.getcwd(), "..", "example_data", "pre_site_note.pdf")
@ -15,7 +15,7 @@ init_db()
assessor0 = AssessorInfo(
**survey_one.pre_site_note.assessor_information.__dict__
**survey_one.pre_site_note.assessor_information.model_dump()
)
with get_db_session() as session:

View file

@ -14,7 +14,7 @@ class SurveyPrice():
self.master_rate_card_path = None
self.all_hubspot_submissions = None
self.all_survey_info_from_sharepoint = None
self.download_price_card()
self.required_sheets = [
'JJC - EMPTIES',
@ -42,27 +42,27 @@ class SurveyPrice():
"JJC - ECO4 CWI EMPTY": "JJC - EMPTIES",
"JJC - GBIS CWI EMPTY": "JJC - EMPTIES",
"JJC - ECO4 CWI REMEDIAL - FOAM": "JJC - FORMALDEHYDE EXTRACTION",
"JJC - ECO4 CWI REMEDIAL - GENERAL": "JJC - GENERAL EXTRACTIONS",
"JJC - ECO4 CWI REMEDIAL": "JJC - GENERAL EXTRACTIONS",
"JJC - GBIS CWI REMEDIAL - FOAM": "JJC - FORMALDEHYDE EXTRACTION",
"JJC - GBIS CWI REMEDIAL - GENERAL": "JJC - GENERAL EXTRACTIONS",
"JJC - GBIS CWI REMEDIAL": "JJC - GENERAL EXTRACTIONS",
# SCIS
"SCIS - ECO4 PV": "SCIS - SOLAR",
"SCIS - ECO4 CWI EMPTY": "SCIS - EMPTIES",
"SCIS - GBIS CWI EMPTY": "SCIS - EMPTIES",
"SCIS - ECO4 CWI REMEDIAL - FOAM": "SCIS - GENERAL EXTRACTIONS",
"SCIS - ECO4 CWI REMEDIAL - GENERAL": "SCIS - GENERAL EXTRACTIONS",
"SCIS - ECO4 CWI REMEDIAL": "SCIS - GENERAL EXTRACTIONS",
"SCIS - GBIS CWI REMEDIAL - FOAM": "SCIS - GENERAL EXTRACTIONS",
"SCIS - GBIS CWI REMEDIAL - GENERAL": "SCIS - GENERAL EXTRACTIONS",
"SCIS - GBIS CWI REMEDIAL": "SCIS - GENERAL EXTRACTIONS",
# SGEC
"SGEC - ECO4 CWI EMPTY": "SGEC - EMPTIES",
"SGEC - GBIS CWI EMPTY": "SGEC - EMPTIES",
"SGEC - ECO4 CWI REMEDIAL - FOAM": "SGEC - FORMALDEHYDE EXTRACTION",
"SGEC - ECO4 CWI REMEDIAL - GENERAL": "SGEC - GENERAL EXTRACTIONS",
"SGEC - ECO4 CWI REMEDIAL": "SGEC - GENERAL EXTRACTIONS",
"SGEC - GBIS CWI REMEDIAL - FOAM": "SGEC - FORMALDEHYDE EXTRACTION",
"SGEC - GBIS CWI REMEDIAL - GENERAL": "SGEC - GENERAL EXTRACTIONS",
"SGEC - GBIS CWI REMEDIAL": "SGEC - GENERAL EXTRACTIONS",
}
def download_price_card(self):
@ -148,6 +148,10 @@ class SurveyPrice():
"HUBSPOT_POST_INSTALL_SAP_SCORE": deal.post_sap_score,
"HUBSPOT_INSTALLER": deal.installer,
"HUBSPOT_WETROOMS": deal.no_of_wet_rooms,
"HUBSPOT_SHAREPOINT_PATH": deal.submission_folder_path,
"HUBSPOT_LANDLORD_ID": deal.landlord_id,
"HUBSPOT_DOMNA_ID": deal.domna_id,
"HUBSPOT_UPRN": deal.uprn,
})
self.all_hubspot_submissions = pd.DataFrame(all_deals)
@ -164,7 +168,7 @@ class SurveyPrice():
scis_pd = self.sharepoint_data_for_installer(SharePointInstaller.SOUTH_COAST_INSULATION)
self.all_survey_info_from_sharepoint = pd.concat([jjc_pd, scis_pd], ignore_index=True)
return self.all_survey_info_from_sharepoint
def sharepoint_data_for_installer(self, installer):
@ -172,7 +176,7 @@ class SurveyPrice():
file_paths = sp.download_file_for_each_address()
surveys = []
for eachAddress in file_paths:
for eachAddress in tqdm(file_paths):
for address, files in eachAddress.items():
surveys.append(surveyedDataProcessor(address, files))
@ -318,6 +322,7 @@ class SurveyPrice():
return merged_df
def calculate_all_price(self):
self.download_price_card()
self.get_all_surveys_from_hubspot()
self.get_all_surveyed_data_from_sharepoint()
submission_data = self.merge_hub_spot_and_survey_information()
@ -336,11 +341,7 @@ class SurveyPrice():
else:
# Cavity wall
sheet_name = f'{self.installer[row["HUBSPOT_INSTALLER"]]} - {row["HUBSPOT_WORK_TYPE"].upper()}'
if row['HUBSPOT_WALL_INSULATION'].upper() == "BEAD/FIBRE/WOOL/OTHER":
sheet_name += " - GENERAL"
elif row['HUBSPOT_WALL_INSULATION'].upper() == "EMPTY":
pass
else:
if row['HUBSPOT_WALL_INSULATION'].upper() == "FORMALDEHYDE UFFI FOAM":
sheet_name += " - FOAM"
sheet_name = self.hubspot_job_to_price_sheet_convertor[sheet_name]
price_matrix = self.get_price_matrix(sheet_name)

View file

@ -1,6 +1,19 @@
from etl.pdfReader.pdfReaderToText import pdfReaderToText
from etl.pdfReader.reportType import ReportType
import math
from etl.load.preSiteNoteTypes import (
AssessorInfo, CompanyInfo,
PreSiteNotesSummaryInfo,
PreSiteNote,
PropertyDescription, Dimension, HeatingType, Heating, HeatingSystemControls,
OtherDetails, WindTurbine, PhotovoltaicPanel, FlueGasHeatRecoverySystem, ShowerAndBaths,
SolarWaterHeating, HotWaterCylinder, WaterHeating, Lighting, VentilationAndCooling,
Door, Walls, Roofs, Floors, PropertyDetail, Windows
)
from etl.load.topLevel import(
Buildings, Documents
)
import uuid
class surveyedDataProcessor():
def __init__(self, address, files):
@ -9,12 +22,12 @@ class surveyedDataProcessor():
self.pre_site_note = None
self.csr = None
self.identify_files()
self.hubspot_deal_id = None
def identify_files(self):
for file in self.files:
pdf = pdfReaderToText(file)
print(file)
if pdf:
if pdf.type == ReportType.QUIDOS_PRESITE_NOTE:
self.pre_site_note = pdf.get_reader()
@ -22,6 +35,423 @@ class surveyedDataProcessor():
elif pdf.type == ReportType.CHARTED_SURVEYOR_REPORT:
self.csr = pdf.get_reader()
def load_pre_site_notes_summary_table(self, db_session):
summary_data = self.pre_site_note.survey_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=PreSiteNotesSummaryInfo,
data_dict=summary_data,
lookup_field="reference_number"
)
def create_building_table(self, db_session):
return self.upsert_record(
db_session=db_session,
model_class=Buildings,
data_dict={
"address":"foo",
"potcode": "foobar",
"UPRN": self.pre_site_note.survey_information.uprn,
"landlord_id": "landlord_id",
"domna_id": "landlord_id",
},
lookup_field="UPRN",
)
def get_attribute_and_load(self, obj, attr_string, pydanticModel, db_session):
found = getattr(obj, attr_string, None)
if found:
print(f"Uploading to data base {found}")
print(f"Uploaded to database with this dict {found.model_dump()}")
if found.model_dump():
db = self.upsert_record(
db_session=db_session,
model_class=pydanticModel,
data_dict=found.model_dump(),
lookup_field=None
)
return db
return None
def load_property_description(self, db_session):
def check_if_attribute_exists(obj, attribute):
a = getattr(obj, attribute, None)
if a:
return True
else:
return False
property_des = self.pre_site_note.property_description.model_dump()
# Seconday Heating
secondary_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"secondaryHeatingType",
HeatingType,
db_session
)
# main heating 2 and main heating 2 controls
mainheating2 = None
mainheating2controls = None
if check_if_attribute_exists(self.pre_site_note.property_description, "mainHeating2"):
if check_if_attribute_exists(self.pre_site_note.property_description.mainHeating2, "controls"):
mainheating2controls = self.get_attribute_and_load(self.pre_site_note.property_description.mainHeating2, "controls", HeatingSystemControls, db_session)
data = self.pre_site_note.property_description.mainHeating2.model_dump()
if data:
mainheating2 = self.upsert_record(
db_session=db_session,
model_class=Heating,
data_dict=data,
lookup_field=None,
additional_fields= {"controls_id": mainheating2controls.id},
)
# main heating and main heating control
mainheating = None
mainheatingcontrols = None
if check_if_attribute_exists(self.pre_site_note.property_description, "mainHeating"):
if check_if_attribute_exists(self.pre_site_note.property_description.mainHeating, "controls"):
print(self.pre_site_note.property_description.mainHeating)
mainheatingcontrols = self.get_attribute_and_load(self.pre_site_note.property_description.mainHeating, 'controls', HeatingSystemControls, db_session)
data = self.pre_site_note.property_description.mainHeating.model_dump()
if data:
mainheating = self.upsert_record(
db_session=db_session,
model_class=Heating,
data_dict=data,
lookup_field=None,
additional_fields={"controls_id": mainheatingcontrols.id}
)
# Other details
otherDetails = self.get_attribute_and_load(
self.pre_site_note.property_description,
"otherDetails",
OtherDetails,
db_session
)
# windTurbine
windTurbine = self.get_attribute_and_load(
self.pre_site_note.property_description,
"windTurbine",
WindTurbine,
db_session
)
#photo_volatic_panel
photo_volatic_panel = self.get_attribute_and_load(
self.pre_site_note.property_description,
"photovoltaicPanel",
PhotovoltaicPanel,
db_session,
)
#fluegasheatrecoverysystem
flue_gas_heat_recovery_system = self.get_attribute_and_load(
self.pre_site_note.property_description,
"flueGasHeatRecoverySystem",
FlueGasHeatRecoverySystem,
db_session,
)
#shower and baths
shower_and_baths = self.get_attribute_and_load(
self.pre_site_note.property_description,
"showerAndBaths",
ShowerAndBaths,
db_session,
)
#solar water heating
solar_water_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"solarWaterHeating",
SolarWaterHeating,
db_session,
)
# hotwatercycling
hot_water_cylinder = self.get_attribute_and_load(
self.pre_site_note.property_description,
"hotWaterCylinder",
HotWaterCylinder,
db_session,
)
# water heating
water_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"waterHeating",
WaterHeating,
db_session,
)
# lighting
lighting = self.get_attribute_and_load(
self.pre_site_note.property_description,
"lighting",
Lighting,
db_session,
)
# ventilation and cooling
ventilation_and_cooling = self.get_attribute_and_load(
self.pre_site_note.property_description,
"ventilationAndCooling",
VentilationAndCooling,
db_session,
)
# door
door = self.get_attribute_and_load(
self.pre_site_note.property_description,
"door",
Door,
db_session,
)
def upload_property_detail(property_part="main_property"):
if check_if_attribute_exists(self.pre_site_note.property_description, property_part):
wall = None
obj = getattr(self.pre_site_note.property_description, property_part)
if check_if_attribute_exists(obj, "wall"):
wall = self.get_attribute_and_load(obj, "wall", Walls, db_session)
roof = None
if check_if_attribute_exists(obj, "roof"):
roof = self.get_attribute_and_load(obj, "roof", Roofs, db_session)
floor = None
if check_if_attribute_exists(obj, "floor"):
floor = self.get_attribute_and_load(obj, "floor", Floors, db_session)
property_detail = self.upsert_record(
db_session=db_session,
model_class=PropertyDetail,
data_dict={
"age_band": obj.age_band,
"floor_id": floor.id if floor else None,
"roof_id": roof.id if roof else None,
"wall_id": wall.id if wall else None,
},
lookup_field=None,
)
dimensions = []
if check_if_attribute_exists(obj, "dimensions"):
dimension_obj = getattr(obj, "dimensions")
for eachDimension in dimension_obj:
data = eachDimension.model_dump()
dimension = self.upsert_record(
db_session=db_session,
model_class=Dimension,
data_dict=data,
lookup_field=None,
additional_fields={"property_detail_id": property_detail.id},
)
dimensions.append(dimension.id)
windows = []
if check_if_attribute_exists(obj, "windows"):
windows_obj = getattr(obj, "windows")
for eachWindow in windows_obj:
data = eachWindow.model_dump()
window = self.upsert_record(
db_session=db_session,
model_class=Windows,
data_dict=data,
lookup_field=None,
additional_fields={"property_detail_id": property_detail.id},
)
windows.append(window.id)
return property_detail
# main_property
main_property = upload_property_detail("main_property")
ex1_property = upload_property_detail("ex1_property")
ex2_property = upload_property_detail("ex2_property")
ex3_property = upload_property_detail("ex3_property")
ex4_property = upload_property_detail("ex4_property")
data = self.pre_site_note.property_description.model_dump()
def remove_dicts_and_lists(data):
if isinstance(data, dict):
# Create a new dict with only primitive types (ignore dicts/lists)
return {
k: remove_dicts_and_lists(v)
for k, v in data.items()
if not isinstance(v, (dict, list))
}
elif isinstance(data, list):
# Remove lists entirely
return None
else:
return data
data = remove_dicts_and_lists(data)
property_description = self.upsert_record(
db_session=db_session,
model_class=PropertyDescription,
data_dict=data,
lookup_field=None,
additional_fields={
"main_heating_id": mainheating.id if mainheating else None,
"main_heating_controls_id": mainheatingcontrols.id if mainheatingcontrols else None,
"main_heating2_id": mainheating2.id if mainheating2 else None,
"main_heating2_controls_id": mainheating2controls.id if mainheating2controls else None,
"secondary_heating_type_id": secondary_heating.id if secondary_heating else None,
"other_details_id": otherDetails.id if otherDetails else None,
"wind_turbine_id": windTurbine.id if windTurbine else None,
"photovoltaic_panel_id": photo_volatic_panel.id if photo_volatic_panel else None,
"flue_gas_heat_recovery_system_id": flue_gas_heat_recovery_system.id if flue_gas_heat_recovery_system else None,
"shower_and_baths_id": shower_and_baths.id if shower_and_baths else None,
"solar_water_heating_id": solar_water_heating.id if solar_water_heating else None,
"hot_water_cylinder_id": hot_water_cylinder.id if hot_water_cylinder else None,
"water_heating_id": water_heating.id if water_heating else None,
"lighting_id": lighting.id if lighting else None,
"ventilation_and_cooling_id": ventilation_and_cooling.id if ventilation_and_cooling else None,
"door_id": door.id if door else None,
"main_property_id": main_property.id if main_property else None,
"ex1_property_id": ex1_property.id if ex1_property else None,
"ex2_property_id": ex2_property.id if ex2_property else None,
"ex3_property_id": ex3_property.id if ex3_property else None,
"ex4_property_id": ex4_property.id if ex4_property else None,
}
)
return property_description
def load_company_table(self, db_session):
company_data = self.pre_site_note.company_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=CompanyInfo,
data_dict=company_data,
lookup_field="trading_name"
)
def create_document_table_via_pre_site_note(self, db_session, pre_site_note, assessor, building):
data = {
"assessor_id": assessor.id,
"created_at": self.pre_site_note.survey_information.inspection_date,
"document_type": ReportType.QUIDOS_PRESITE_NOTE,
"building_id": building.id,
"target_table": "pre_site_note",
"target_id": pre_site_note.id
}
return self.upsert_record(
db_session=db_session,
model_class=Documents,
data_dict=data,
lookup_field=None,
)
def create_buildings_table(
self,
db_session,
landlord_id,
domna_id,
):
data = {
"address": self.pre_site_note.survey_information.address,
"postcode": self.pre_site_note.survey_information.postcode,
"UPRN": self.pre_site_note.survey_information.uprn,
"landlord_id": landlord_id,
"domna_id": domna_id
}
building = self.upsert_record(
db_session=db_session,
model_class=Buildings,
data_dict=data,
lookup_field="UPRN",
)
return building
def create_pre_site_note_table(
self,
db_session,
assessor,
summary_info,
pre_site_note_description,
):
preSiteNote = PreSiteNote(
summary_info_id=summary_info.id,
assessor_id=assessor.id,
pre_site_note_description_id=pre_site_note_description.id,
)
db_session.add(preSiteNote)
db_session.commit()
return preSiteNote
def upsert_record(
self,
db_session,
model_class,
data_dict,
lookup_field,
update_if_exists: bool = False,
additional_fields: dict = None
):
clean_data = data_dict
# Merge additional fields if provided
if additional_fields:
clean_data.update(additional_fields)
if lookup_field is not None:
lookup_value = clean_data.get(lookup_field)
if not lookup_value:
raise ValueError(f"Missing lookup field '{lookup_field}' in data.")
# Try to find existing record
existing_record = db_session.query(model_class).filter(
getattr(model_class, lookup_field) == lookup_value
).first()
if existing_record:
# Update existing record if update_if_exists is True
if update_if_exists:
for key, value in clean_data.items():
setattr(existing_record, key, value)
db_session.commit()
return existing_record
# Filter out invalid fields that don't exist in the model class
valid_fields = [field for field in clean_data if hasattr(model_class, field)]
clean_data = {field: clean_data[field] for field in valid_fields}
print(f'clean data is {clean_data}')
# Handle Pydantic models (with model_validate or parse_obj)
new_record = model_class(**clean_data)
# Add the new record to the session and commit
db_session.add(new_record)
db_session.commit()
return new_record
def load_assessor_table(self, db_session):
company = self.load_company_table(db_session)
assessor_data = self.pre_site_note.assessor_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=AssessorInfo,
data_dict=assessor_data,
lookup_field="accreditation_number",
additional_fields={"company_id": company.id}
)
def get_insulation_info(self):
if self.csr:
if self.csr.insulation_info:
@ -142,6 +572,4 @@ class surveyedDataProcessor():
def get_current_sap_score(self):
score_list = self.pre_site_note.survey_information.current_sap.split(" ")
score = int(score_list[1])
return score
return score

View file

@ -2,7 +2,7 @@
import os
from etl.surveyedData.surveryedData import surveyedDataProcessor
from etl.db.db import get_db_session, init_db
from etl.transform.types import AssessorInfo
from etl.transform.preSiteNoteTypes import AssessorInfo
import pytest
from etl.jjc_old_lewis_manual_way_ import work_out_total_floor_area

View file

@ -2,7 +2,7 @@
import os
from etl.surveyedData.surveryedData import surveyedDataProcessor
from etl.db.db import get_db_session, init_db
from etl.transform.types import AssessorInfo
from etl.transform.preSiteNoteTypes import AssessorInfo
import pytest
from etl.jjc_old_lewis_manual_way_ import work_out_total_floor_area

View file

@ -1,4 +1,4 @@
from sqlmodel import Field, SQLModel
from sqlmodel import Field, SQLModel, Relationship
import uuid
from datetime import datetime
from pydantic import field_validator, EmailStr
@ -7,10 +7,7 @@ from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
class BaseModel(SQLModel):
id: uuid.UUID = Field(
default_factory=uuid.uuid4,
sa_column=Column(UUID(as_uuid=True), primary_key=True)
)
pass
class Dimension(BaseModel):
floor_area_m2: float
@ -19,6 +16,7 @@ class Dimension(BaseModel):
party_wall_length_m: float
class CompanyInfo(BaseModel):
address: str
trading_name: str
post_code: str
fax_number: Optional[str] = None
@ -30,7 +28,7 @@ class CompanyInfo(BaseModel):
return None
return v
class SurverySummaryInfo(BaseModel):
class PreSiteNotesSummaryInfo(BaseModel):
reference_number: str
epc_language: str
uprn: Optional[str] = ""
@ -79,12 +77,13 @@ class Door(BaseModel):
no_of_insulated_doors: int
u_value_w_m2_k: Optional[str]
class AssessorInfo(BaseModel, table=True):
class AssessorInfo(BaseModel):
accreditation_number: str
name: str
phone_number: Optional[str] = None
email_address: Optional[EmailStr] = None
class VentilationAndCooling(BaseModel):
no_of_open_fireplaces: int
ventilation_type: str

View file

@ -1,2 +1,4 @@
poetry run alembic revision --autogenerate -m "some msg"
#poetry upgrade head
#poetry run alembic revision --autogenerate -m "Add address in company"
poetry run alembic upgrade head

1065
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,10 +19,10 @@ dependencies = [
"pydantic-settings (>=2.8.1,<3.0.0)",
"alembic (>=1.15.1,<2.0.0)",
"pytest (>=8.3.5,<9.0.0)",
"hubspot-api-client (>=11.1.0,<12.0.0)",
"monday (>=2.0.1,<3.0.0)",
"beautifulsoup4 (>=4.13.4,<5.0.0)",
"tqdm (>=4.67.1,<5.0.0)",
"hubspot-api-client (>=12.0.0,<13.0.0)",
]
[tool.poetry]