| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- from typing import Any, List
- from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
- from fastapi.responses import Response
- from sqlalchemy.orm import Session
- from backend.app.api import deps
- from backend.app.core.database import get_db
- from backend.app.models import sql_models
- from backend.app.schemas import schemas
- from backend.app.services.scheduler import update_job, run_analysis_pipeline
- import openpyxl
- import io
- import json
- router = APIRouter()
- @router.get("", response_model=List[schemas.Task])
- def read_tasks(
- db: Session = Depends(get_db),
- skip: int = 0,
- limit: int = 100,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- return db.query(sql_models.Task).offset(skip).limit(limit).all()
- @router.get("/export_template")
- def export_template(
- db: Session = Depends(get_db),
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- """
- Export task configuration template (including current data)
- """
- wb = openpyxl.Workbook()
- ws = wb.active
- ws.title = "Tasks"
-
- # Headers
- headers = ["Task Name", "Model Config Name", "Camera Names", "Cron Expression", "Rules (JSON)"]
- ws.append(headers)
-
- # Data
- tasks = db.query(sql_models.Task).all()
-
- # Pre-fetch cameras for name lookup (optimization)
- all_cameras = {c.id: c.name for c in db.query(sql_models.Camera).all()}
-
- for task in tasks:
- # Get Model Name
- model_name = task.model_config.name if task.model_config else ""
-
- # Get Camera Names
- cam_names = []
- if task.camera_ids:
- for cid in task.camera_ids:
- if cid in all_cameras:
- cam_names.append(all_cameras[cid])
- camera_names_str = ",".join(cam_names)
-
- # Rules to JSON
- rules_str = json.dumps(task.rules, ensure_ascii=False) if task.rules else "[]"
-
- ws.append([task.name, model_name, camera_names_str, task.cron_expression, rules_str])
-
- # Adjust column width
- ws.column_dimensions['A'].width = 25
- ws.column_dimensions['B'].width = 25
- ws.column_dimensions['C'].width = 40
- ws.column_dimensions['D'].width = 20
- ws.column_dimensions['E'].width = 50
-
- # Save to buffer
- buffer = io.BytesIO()
- wb.save(buffer)
- buffer.seek(0)
-
- return Response(
- content=buffer.getvalue(),
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": "attachment; filename=tasks_template.xlsx"}
- )
- @router.post("/import_template")
- async def import_template(
- file: UploadFile = File(...),
- db: Session = Depends(get_db),
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- """
- Import tasks from template
- """
- if not file.filename.endswith(('.xlsx', '.xls')):
- raise HTTPException(status_code=400, detail="Invalid file format. Please upload an Excel file.")
-
- try:
- contents = await file.read()
- wb = openpyxl.load_workbook(io.BytesIO(contents))
- ws = wb.active
-
- # Read headers
- rows = list(ws.rows)
- if not rows:
- return {"message": "Empty file"}
-
- header_row = rows[0]
- headers = [cell.value for cell in header_row]
-
- # Validate headers
- required_headers = ["Task Name", "Model Config Name", "Camera Names", "Cron Expression", "Rules (JSON)"]
- if not all(h in headers for h in required_headers):
- raise HTTPException(status_code=400, detail=f"Invalid template format. Required columns: {', '.join(required_headers)}")
-
- name_idx = headers.index("Task Name")
- model_idx = headers.index("Model Config Name")
- cams_idx = headers.index("Camera Names")
- cron_idx = headers.index("Cron Expression")
- rules_idx = headers.index("Rules (JSON)")
-
- added_count = 0
- updated_count = 0
-
- # Pre-fetch lookup maps
- # Model Configs: Name -> ID
- model_map = {m.name: m.id for m in db.query(sql_models.ModelConfig).all()}
- # Cameras: Name -> ID
- camera_map = {c.name: c.id for c in db.query(sql_models.Camera).all()}
-
- # Process data
- for row in rows[1:]:
- task_name = row[name_idx].value
- model_config_name = row[model_idx].value
- camera_names_str = row[cams_idx].value
- cron_expr = row[cron_idx].value
- rules_str = row[rules_idx].value
-
- if not task_name:
- continue
-
- # Resolve Model ID
- model_config_id = model_map.get(model_config_name)
- if not model_config_id:
- # Optionally skip or log warning. Here we skip.
- continue
-
- # Resolve Camera IDs
- camera_ids = []
- if camera_names_str:
- cam_names = [c.strip() for c in str(camera_names_str).split(",")]
- for cname in cam_names:
- if cname in camera_map:
- camera_ids.append(camera_map[cname])
-
- # Parse Rules
- try:
- rules = json.loads(rules_str) if rules_str else []
- except:
- rules = []
-
- # Check if task exists
- existing_task = db.query(sql_models.Task).filter(sql_models.Task.name == task_name).first()
-
- if existing_task:
- # Update
- existing_task.model_config_id = model_config_id
- existing_task.camera_ids = camera_ids
- existing_task.rules = rules
- existing_task.cron_expression = cron_expr
-
- # If task is running, update scheduler
- if existing_task.is_running:
- update_job(existing_task.id, existing_task.cron_expression, True)
-
- updated_count += 1
- else:
- # Create
- new_task = sql_models.Task(
- name=task_name,
- model_config_id=model_config_id,
- camera_ids=camera_ids,
- rules=rules,
- cron_expression=cron_expr,
- is_running=False
- )
- db.add(new_task)
- added_count += 1
-
- db.commit()
-
- return {
- "message": "Import successful",
- "added": added_count,
- "updated": updated_count
- }
-
- except HTTPException as e:
- raise e
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"Import failed: {str(e)}")
- @router.post("", response_model=schemas.Task)
- def create_task(
- *,
- db: Session = Depends(get_db),
- task_in: schemas.TaskCreate,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- task = sql_models.Task(**task_in.model_dump())
- db.add(task)
- db.commit()
- db.refresh(task)
- return task
- @router.put("/{id}", response_model=schemas.Task)
- def update_task(
- *,
- db: Session = Depends(get_db),
- id: int,
- task_in: schemas.TaskUpdate,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first()
- if not task:
- raise HTTPException(status_code=404, detail="Task not found")
-
- # Update fields
- for key, value in task_in.model_dump().items():
- setattr(task, key, value)
-
- db.commit()
- db.refresh(task)
-
- # Update scheduler if running
- if task.is_running:
- update_job(task.id, task.cron_expression, True)
-
- return task
- @router.delete("/{id}", response_model=schemas.Task)
- def delete_task(
- *,
- db: Session = Depends(get_db),
- id: int,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first()
- if not task:
- raise HTTPException(status_code=404, detail="Task not found")
-
- # Stop scheduler
- update_job(task.id, task.cron_expression, False)
-
- # Delete associated logs first
- db.query(sql_models.TaskLog).filter(sql_models.TaskLog.task_id == id).delete()
-
- db.delete(task)
- db.commit()
- return task
- @router.post("/{id}/toggle", response_model=schemas.Task)
- def toggle_task(
- *,
- db: Session = Depends(get_db),
- id: int,
- toggle_in: schemas.TaskToggle,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- task = db.query(sql_models.Task).filter(sql_models.Task.id == id).first()
- if not task:
- raise HTTPException(status_code=404, detail="Task not found")
-
- task.is_running = toggle_in.running
- db.commit()
- db.refresh(task)
-
- update_job(task.id, task.cron_expression, task.is_running)
-
- return task
- @router.post("/test")
- async def test_task(
- *,
- db: Session = Depends(get_db),
- task_in: schemas.TaskTest,
- current_user: sql_models.User = Depends(deps.get_current_user)
- ) -> Any:
- # 1. Get Model Config
- model_config = db.query(sql_models.ModelConfig).filter(sql_models.ModelConfig.id == task_in.model_config_id).first()
- if not model_config:
- raise HTTPException(status_code=404, detail="Model configuration not found")
- # 2. Run Pipeline (without saving to DB, or maybe we want to save logs for test?
- # User requirement: "Output logs to console".
- # Let's execute the pipeline. run_analysis_pipeline can skip DB save if we add a param,
- # but currently it saves. Saving test logs to DB is actually fine/useful.)
-
- # We will use task_id=0 for temporary test or just pass dummy ID.
- # But run_analysis_pipeline writes log with task_id.
- # If we pass 0, it might violate FK if we save to DB.
- # So for test, we might NOT save to DB to avoid FK error, or we create a dummy task.
- # Let's modify run_analysis_pipeline to optionally save to DB.
-
- # Wait, I already updated scheduler.py to accept save_to_db param.
-
- results = await run_analysis_pipeline(
- task_id=0, # Dummy ID, won't be used if save_to_db=False
- camera_ids=task_in.camera_ids,
- model_config=model_config,
- rules=task_in.rules,
- save_to_db=False # Don't save test runs to DB to avoid polluting logs/FK issues
- )
-
- return {"status": "success", "results": results}
|