import datetime import pytz from sqlalchemy import select from sqlalchemy.orm import Session from sqlalchemy.orm.exc import NoResultFound from backend.app.db.models.solar import Solar, SolarScenario def get_solar_data(session: Session, longitude: float = None, latitude: float = None, uprn: str = None): """ This function will fetch data from the solar table based on longitude and latitude or UPRN. :param session: The database session :param longitude: The longitude to search for :param latitude: The latitude to search for :param uprn: The UPRN to search for (overrides longitude and latitude if provided) :return: The google_api_response and updated_at fields """ try: if uprn: # Search by UPRN solar_data = session.query(Solar.google_api_response, Solar.updated_at).filter_by(uprn=uprn).one() else: # Search by longitude and latitude solar_data = session.query(Solar.google_api_response, Solar.updated_at).filter( Solar.longitude == longitude, Solar.latitude == latitude ).one() # Check if updated_at is more than 6 months old six_months_ago = datetime.datetime.now(pytz.utc) - datetime.timedelta(days=6 * 30) # Approximate 6 months is_outdated = solar_data.updated_at < six_months_ago return solar_data.google_api_response, solar_data.updated_at, is_outdated except NoResultFound: return None, None, False def store_batch_data(session: Session, api_data: dict, uprns_to_location: list, scenarios_data: list): """ This function will store the API data to the solar table against all of the UPRNs with longitude and latitude. If a record already exists in the Solar table by UPRN, it will be updated instead of creating a new one. Similarly, if a scenario exists in SolarScenario by number_panels, it will also be updated. :param session: The database session :param api_data: The API data to store :param uprns_to_location: A list of dictionaries containing UPRN, longitude, and latitude :param scenarios_data: A list of dictionaries containing scenario data for each UPRN """ try: # Insert or update data into the Solar table for data in uprns_to_location: existing_solar = session.execute(select(Solar).where(Solar.uprn == data['uprn'])).scalar_one_or_none() if existing_solar: # Update the existing record existing_solar.longitude = data['longitude'] existing_solar.latitude = data['latitude'] existing_solar.google_api_response = api_data existing_solar.updated_at = datetime.datetime.now(pytz.utc) solar_id = existing_solar.id else: # Insert a new record solar_record = Solar( uprn=data['uprn'], longitude=data['longitude'], latitude=data['latitude'], google_api_response=api_data, updated_at=datetime.datetime.now(pytz.utc) ) session.add(solar_record) session.flush() # Flush to get the IDs generated session.refresh(solar_record) # Refresh to populate the ID field solar_id = solar_record.id # Insert or update data in the SolarScenario table for scenario in scenarios_data: existing_scenario = session.execute( select(SolarScenario).where( SolarScenario.solar_id == solar_id, SolarScenario.number_panels == scenario['number_panels'] ) ).scalar_one_or_none() if existing_scenario: # Update the existing scenario record existing_scenario.scenario_type = scenario['scenario_type'] existing_scenario.array_kwhp = scenario['array_kwhp'] existing_scenario.lifetime_dc_kwh = scenario['lifetime_dc_kwh'] existing_scenario.yearly_dc_kwh = scenario['yearly_dc_kwh'] existing_scenario.lifetime_ac_kwh = scenario.get('lifetime_ac_kwh') # Optional field existing_scenario.yearly_ac_kwh = scenario.get('yearly_ac_kwh') # Optional field existing_scenario.cost = scenario['cost'] existing_scenario.expected_payback_years = scenario.get('expected_payback_years') # Optional field existing_scenario.panelled_roof_area = scenario['panelled_roof_area'] existing_scenario.is_default = scenario['is_default'] else: # Insert a new scenario record scenario_record = SolarScenario( solar_id=solar_id, scenario_type=scenario['scenario_type'], number_panels=scenario['number_panels'], array_kwhp=scenario['array_kwhp'], lifetime_dc_kwh=scenario['lifetime_dc_kwh'], yearly_dc_kwh=scenario['yearly_dc_kwh'], lifetime_ac_kwh=scenario.get('lifetime_ac_kwh'), # Optional field yearly_ac_kwh=scenario.get('yearly_ac_kwh'), # Optional field cost=scenario['cost'], expected_payback_years=scenario.get('expected_payback_years'), # Optional field panelled_roof_area=scenario['panelled_roof_area'], is_default=scenario['is_default'] ) session.add(scenario_record) # Commit the changes after all operations session.commit() except Exception as e: session.rollback() raise e