from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse import asyncio import logging import os import json import pymysql from sqlalchemy import text from backend.app.core.config import settings from backend.app.core.database import engine, SessionLocal from backend.app.models import sql_models from backend.app.api.api import api_router from backend.app.services.video_core import video_manager from backend.app.services.scheduler import init_scheduler from backend.app.core import security # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title=settings.PROJECT_NAME) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # API Routes app.include_router(api_router, prefix=settings.API_V1_STR) # Static Files # Ensure directories exist os.makedirs("backend/app/static", exist_ok=True) os.makedirs("backend/dist", exist_ok=True) os.makedirs("backend/dist/assets", exist_ok=True) app.mount("/static", StaticFiles(directory="backend/app/static"), name="static") def init_db_if_not_exists(): """Check if database exists, create if not.""" max_retries = 30 retry_interval = 2 # seconds for attempt in range(max_retries): try: logger.info(f"Connecting to MySQL server at {settings.MYSQL_SERVER} to check database (Attempt {attempt + 1}/{max_retries})...") conn = pymysql.connect( host=settings.MYSQL_SERVER, user=settings.MYSQL_USER, password=settings.MYSQL_PASSWORD, port=int(settings.MYSQL_PORT), charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) with conn.cursor() as cursor: logger.info(f"Checking database {settings.MYSQL_DB}...") cursor.execute(f"CREATE DATABASE IF NOT EXISTS {settings.MYSQL_DB} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;") logger.info("Database checked/created.") conn.close() return # Success except pymysql.err.OperationalError as e: if e.args[0] == 2003: # Connection refused logger.warning(f"Database not ready yet, retrying in {retry_interval}s...") import time time.sleep(retry_interval) else: logger.error(f"Database initialization failed: {e}") # Don't raise immediately, maybe transient? But usually logic error. # Let's retry anyway to be safe, or break if critical. time.sleep(retry_interval) except Exception as e: logger.error(f"Database initialization failed: {e}") import time time.sleep(retry_interval) logger.error("Could not connect to database after multiple attempts.") # raise Exception("Database connection failed") # Optional: crash app # Startup Event @app.on_event("startup") async def startup_event(): logger.info("Starting up...") # 1. Ensure Database Exists (from init_db_script.py) init_db_if_not_exists() # 2. Ensure Tables Exist try: sql_models.Base.metadata.create_all(bind=engine) logger.info("Tables created/verified.") except Exception as e: logger.error(f"Error creating tables: {e}") # 2.5 Migration: Check for new columns db = SessionLocal() try: logger.info("Checking for pending migrations...") # Check is_superuser try: db.execute(text("SELECT is_superuser FROM users LIMIT 1")) except Exception: logger.info("Column 'is_superuser' missing in users table. Adding it...") db.execute(text("ALTER TABLE users ADD COLUMN is_superuser BOOLEAN DEFAULT FALSE")) db.commit() logger.info("Column 'is_superuser' added.") except Exception as e: logger.error(f"Migration error: {e}") db.rollback() finally: db.close() # 3. Init Admin User (from reset_admin.py) db = SessionLocal() try: username = "admin" password = "HNYZ0821" admin = db.query(sql_models.User).filter(sql_models.User.username == username).first() if admin: logger.info(f"User '{username}' found. Resetting password...") admin.hashed_password = security.get_password_hash(password) admin.is_active = True admin.is_superuser = True db.commit() logger.info(f"Password for '{username}' has been reset and superuser granted.") else: logger.info(f"User '{username}' not found. Creating...") hashed_pwd = security.get_password_hash(password) user = sql_models.User(username=username, hashed_password=hashed_pwd, is_active=True, is_superuser=True) db.add(user) db.commit() logger.info(f"User '{username}' created.") except Exception as e: logger.error(f"Error initializing admin user: {e}") # Init Video Streams try: cameras = db.query(sql_models.Camera).all() for cam in cameras: if cam.stream_url: video_manager.add_stream(cam.id, cam.stream_url) except Exception as e: logger.error(f"Error loading cameras: {e}") db.close() # Init Scheduler init_scheduler() # WebSocket @app.websocket("/ws/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: # 1 FPS await asyncio.sleep(1) snapshot_data = video_manager.get_all_snapshots_base64() await websocket.send_text(json.dumps(snapshot_data)) except WebSocketDisconnect: logger.info("WebSocket disconnected") except Exception as e: logger.error(f"WebSocket error: {e}") # Frontend Static (Production) app.mount("/assets", StaticFiles(directory="backend/dist/assets"), name="assets") @app.get("/{full_path:path}") async def catch_all(full_path: str): # API requests are handled above. # This handles frontend routing. dist_path = "backend/dist" file_path = os.path.join(dist_path, full_path) if os.path.exists(file_path) and os.path.isfile(file_path): return FileResponse(file_path) # Default to index.html for SPA index_path = os.path.join(dist_path, "index.html") if os.path.exists(index_path): return FileResponse(index_path) return FileResponse(file_path) # Fallback (might 404)