from typing import Any, List from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from fastapi.responses import Response from sqlalchemy.orm import Session from backend.app.api import deps from backend.app.core.database import get_db from backend.app.models import sql_models from backend.app.schemas import schemas from backend.app.services.scheduler import update_job, run_analysis_pipeline import openpyxl import io import json router = APIRouter() @router.get("", response_model=List[schemas.Task]) def read_tasks( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: return db.query(sql_models.Task).offset(skip).limit(limit).all() @router.get("/export_template") def export_template( db: Session = Depends(get_db), current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: """ Export task configuration template (including current data) """ wb = openpyxl.Workbook() ws = wb.active ws.title = "Tasks" # Headers headers = ["Task Name", "Model Config Name", "Camera Names", "Cron Expression", "Rules (JSON)"] ws.append(headers) # Data tasks = db.query(sql_models.Task).all() # Pre-fetch cameras for name lookup (optimization) all_cameras = {c.id: c.name for c in db.query(sql_models.Camera).all()} for task in tasks: # Get Model Name model_name = task.model_config.name if task.model_config else "" # Get Camera Names cam_names = [] if task.camera_ids: for cid in task.camera_ids: if cid in all_cameras: cam_names.append(all_cameras[cid]) camera_names_str = ",".join(cam_names) # Rules to JSON rules_str = json.dumps(task.rules, ensure_ascii=False) if task.rules else "[]" ws.append([task.name, model_name, camera_names_str, task.cron_expression, rules_str]) # Adjust column width ws.column_dimensions['A'].width = 25 ws.column_dimensions['B'].width = 25 ws.column_dimensions['C'].width = 40 ws.column_dimensions['D'].width = 20 ws.column_dimensions['E'].width = 50 # Save to buffer buffer = io.BytesIO() wb.save(buffer) buffer.seek(0) return Response( content=buffer.getvalue(), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": "attachment; filename=tasks_template.xlsx"} ) @router.post("/import_template") async def import_template( file: UploadFile = File(...), db: Session = Depends(get_db), current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: """ Import tasks from template """ if not file.filename.endswith(('.xlsx', '.xls')): raise HTTPException(status_code=400, detail="Invalid file format. Please upload an Excel file.") try: contents = await file.read() wb = openpyxl.load_workbook(io.BytesIO(contents)) ws = wb.active # Read headers rows = list(ws.rows) if not rows: return {"message": "Empty file"} header_row = rows[0] headers = [cell.value for cell in header_row] # Validate headers required_headers = ["Task Name", "Model Config Name", "Camera Names", "Cron Expression", "Rules (JSON)"] if not all(h in headers for h in required_headers): raise HTTPException(status_code=400, detail=f"Invalid template format. Required columns: {', '.join(required_headers)}") name_idx = headers.index("Task Name") model_idx = headers.index("Model Config Name") cams_idx = headers.index("Camera Names") cron_idx = headers.index("Cron Expression") rules_idx = headers.index("Rules (JSON)") added_count = 0 updated_count = 0 # Pre-fetch lookup maps # Model Configs: Name -> ID model_map = {m.name: m.id for m in db.query(sql_models.ModelConfig).all()} # Cameras: Name -> ID camera_map = {c.name: c.id for c in db.query(sql_models.Camera).all()} # Process data for row in rows[1:]: task_name = row[name_idx].value model_config_name = row[model_idx].value camera_names_str = row[cams_idx].value cron_expr = row[cron_idx].value rules_str = row[rules_idx].value if not task_name: continue # Resolve Model ID model_config_id = model_map.get(model_config_name) if not model_config_id: # Optionally skip or log warning. Here we skip. continue # Resolve Camera IDs camera_ids = [] if camera_names_str: cam_names = [c.strip() for c in str(camera_names_str).split(",")] for cname in cam_names: if cname in camera_map: camera_ids.append(camera_map[cname]) # Parse Rules try: rules = json.loads(rules_str) if rules_str else [] except: rules = [] # Check if task exists existing_task = db.query(sql_models.Task).filter(sql_models.Task.name == task_name).first() if existing_task: # Update existing_task.model_config_id = model_config_id existing_task.camera_ids = camera_ids existing_task.rules = rules existing_task.cron_expression = cron_expr # If task is running, update scheduler if existing_task.is_running: update_job(existing_task.id, existing_task.cron_expression, True) updated_count += 1 else: # Create new_task = sql_models.Task( name=task_name, model_config_id=model_config_id, camera_ids=camera_ids, rules=rules, cron_expression=cron_expr, is_running=False ) db.add(new_task) added_count += 1 db.commit() return { "message": "Import successful", "added": added_count, "updated": updated_count } except HTTPException as e: raise e except Exception as e: raise HTTPException(status_code=400, detail=f"Import failed: {str(e)}") @router.post("", response_model=schemas.Task) def create_task( *, db: Session = Depends(get_db), task_in: schemas.TaskCreate, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: task = sql_models.Task(**task_in.model_dump()) db.add(task) db.commit() db.refresh(task) return task @router.put("/{id}", response_model=schemas.Task) def update_task( *, db: Session = Depends(get_db), id: int, task_in: schemas.TaskUpdate, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Update fields for key, value in task_in.model_dump().items(): setattr(task, key, value) db.commit() db.refresh(task) # Update scheduler if running if task.is_running: update_job(task.id, task.cron_expression, True) return task @router.delete("/{id}", response_model=schemas.Task) def delete_task( *, db: Session = Depends(get_db), id: int, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") # Stop scheduler update_job(task.id, task.cron_expression, False) # Delete associated logs first db.query(sql_models.TaskLog).filter(sql_models.TaskLog.task_id == id).delete() db.delete(task) db.commit() return task @router.post("/{id}/toggle", response_model=schemas.Task) def toggle_task( *, db: Session = Depends(get_db), id: int, toggle_in: schemas.TaskToggle, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") task.is_running = toggle_in.running db.commit() db.refresh(task) update_job(task.id, task.cron_expression, task.is_running) return task @router.post("/test") async def test_task( *, db: Session = Depends(get_db), task_in: schemas.TaskTest, current_user: sql_models.User = Depends(deps.get_current_user) ) -> Any: # 1. Get Model Config model_config = db.query(sql_models.ModelConfig).filter(sql_models.ModelConfig.id == task_in.model_config_id).first() if not model_config: raise HTTPException(status_code=404, detail="Model configuration not found") # 2. Run Pipeline (without saving to DB, or maybe we want to save logs for test? # User requirement: "Output logs to console". # Let's execute the pipeline. run_analysis_pipeline can skip DB save if we add a param, # but currently it saves. Saving test logs to DB is actually fine/useful.) # We will use task_id=0 for temporary test or just pass dummy ID. # But run_analysis_pipeline writes log with task_id. # If we pass 0, it might violate FK if we save to DB. # So for test, we might NOT save to DB to avoid FK error, or we create a dummy task. # Let's modify run_analysis_pipeline to optionally save to DB. # Wait, I already updated scheduler.py to accept save_to_db param. results = await run_analysis_pipeline( task_id=0, # Dummy ID, won't be used if save_to_db=False camera_ids=task_in.camera_ids, model_config=model_config, rules=task_in.rules, save_to_db=False # Don't save test runs to DB to avoid polluting logs/FK issues ) return {"status": "success", "results": results}