| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from pydantic import BaseModel, field_validator
- from typing import Optional, List, Any
- from datetime import datetime
- from apscheduler.triggers.cron import CronTrigger
- # Auth
- class Token(BaseModel):
- access_token: str
- token_type: str
- is_superuser: bool = False
- username: str = ""
- class TokenData(BaseModel):
- username: Optional[str] = None
- class SimpleAuthCallback(BaseModel):
- ticket: str
- # User
- class UserBase(BaseModel):
- username: str
- is_active: Optional[bool] = True
- is_superuser: Optional[bool] = False
- class UserCreate(UserBase):
- password: str
- class UserUpdate(BaseModel):
- password: Optional[str] = None
- is_active: Optional[bool] = None
- is_superuser: Optional[bool] = None
- class User(UserBase):
- id: int
-
- class Config:
- from_attributes = True
- class UserLogin(BaseModel):
- username: str
- password: str
- # Camera
- class CameraBase(BaseModel):
- name: str
- stream_url: str
- class CameraCreate(CameraBase):
- pass
- class CameraUpdate(CameraBase):
- pass
- class CameraTest(BaseModel):
- stream_url: str
- class Camera(CameraBase):
- id: int
- status: int
- created_at: datetime
-
- class Config:
- from_attributes = True
- # Model Config
- class ModelConfigBase(BaseModel):
- name: str
- base_url: str
- api_key: str
- model_name: str
- class ModelConfigCreate(ModelConfigBase):
- pass
- class ModelConfig(ModelConfigBase):
- id: int
-
- class Config:
- from_attributes = True
- # Task
- def validate_cron_helper(v: str) -> str:
- # 1. Try standard 5-field cron
- try:
- CronTrigger.from_crontab(v)
- return v
- except ValueError:
- pass
-
- # 2. Try 6-field cron (second minute hour day month day_of_week)
- try:
- parts = v.split()
- if len(parts) == 6:
- CronTrigger(
- second=parts[0],
- minute=parts[1],
- hour=parts[2],
- day=parts[3],
- month=parts[4],
- day_of_week=parts[5]
- )
- return v
- except Exception:
- pass
- raise ValueError("Invalid cron expression. Expected 5 fields (standard) or 6 fields (with seconds).")
- class TaskRule(BaseModel):
- name: str
- prompt: str
- class TaskBase(BaseModel):
- name: str
- model_config_id: int
- camera_ids: List[int]
- rules: List[TaskRule]
- cron_expression: str
- class TaskCreate(TaskBase):
- @field_validator('cron_expression')
- @classmethod
- def validate_cron(cls, v: str) -> str:
- return validate_cron_helper(v)
- class TaskUpdate(TaskBase):
- @field_validator('cron_expression')
- @classmethod
- def validate_cron(cls, v: str) -> str:
- return validate_cron_helper(v)
- class TaskTest(BaseModel):
- model_config_id: int
- camera_ids: List[int]
- rules: List[TaskRule]
- class Task(TaskBase):
- id: int
- is_running: bool
-
- class Config:
- from_attributes = True
- class TaskToggle(BaseModel):
- running: bool
- # Log
- class TaskLogBase(BaseModel):
- task_id: int
- camera_id: int
- snapshot_path: str
- alarm_name: Optional[str] = None
- alarm_content: Optional[str] = None
- area: Optional[str] = None
- is_alarm: bool
- class TaskLog(TaskLogBase):
- id: int
- check_time: datetime
-
- class Config:
- from_attributes = True
- # Duty Report
- class DutyReportBase(BaseModel):
- reporter_name: Optional[str] = None
- start_time: datetime
- end_time: datetime
- class DutyReportCreate(DutyReportBase):
- pass
- class DutyReport(DutyReportBase):
- id: int
- status: str
- file_path: Optional[str] = None
- created_at: datetime
-
- class Config:
- from_attributes = True
|