llm_agent.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import base64
  2. import json
  3. from openai import AsyncOpenAI
  4. import logging
  5. from typing import List, Any
  6. logger = logging.getLogger(__name__)
  7. async def analyze_image(
  8. base_url: str,
  9. api_key: str,
  10. model_name: str,
  11. rules: List[Any],
  12. image_bytes: bytes
  13. ) -> str:
  14. try:
  15. client = AsyncOpenAI(
  16. base_url=base_url,
  17. api_key=api_key,
  18. )
  19. # Prepare rules text
  20. # Handle both dicts and Pydantic models
  21. rules_data = []
  22. for r in rules:
  23. if hasattr(r, 'model_dump'):
  24. rules_data.append(r.model_dump())
  25. elif hasattr(r, 'dict'):
  26. rules_data.append(r.dict())
  27. else:
  28. rules_data.append(r)
  29. rules_text = json.dumps(rules_data, ensure_ascii=False, indent=2)
  30. system_prompt = """你是一个智能安防助手。
  31. 请根据提供的监控画面和告警规则列表,逐项检查是否触发告警。
  32. 请严格按照 JSON 格式返回结果,返回一个对象列表。
  33. 每个对象包含以下字段:
  34. - "alarm_name": 告警规则名称 (String)
  35. - "alarm_content": 详细描述发现的情况,如果没有异常请填 "正常" (String)
  36. - "is_alarm": 是否触发告警 (Boolean, true/false)
  37. - "area": 异常发生的区域描述,如果无异常可填 "全局" (String)
  38. 不要包含 markdown 格式标记(如 ```json),直接返回 JSON 字符串。
  39. """
  40. user_content = f"请根据以下规则检查画面:\n{rules_text}"
  41. b64_image = base64.b64encode(image_bytes).decode('utf-8')
  42. response = await client.chat.completions.create(
  43. model=model_name,
  44. messages=[
  45. {
  46. "role": "system",
  47. "content": system_prompt
  48. },
  49. {
  50. "role": "user",
  51. "content": [
  52. {"type": "text", "text": user_content},
  53. {
  54. "type": "image_url",
  55. "image_url": {
  56. "url": f"data:image/jpeg;base64,{b64_image}"
  57. },
  58. },
  59. ],
  60. }
  61. ],
  62. max_tokens=1000
  63. )
  64. return response.choices[0].message.content
  65. except Exception as e:
  66. logger.error(f"LLM Error: {e}")
  67. return json.dumps([{"alarm_name": "System Error", "alarm_content": str(e), "is_alarm": True, "time": "", "area": ""}])
  68. async def test_connection(
  69. base_url: str,
  70. api_key: str,
  71. model_name: str
  72. ) -> str:
  73. try:
  74. client = AsyncOpenAI(
  75. base_url=base_url,
  76. api_key=api_key,
  77. )
  78. response = await client.chat.completions.create(
  79. model=model_name,
  80. messages=[
  81. {"role": "user", "content": "Hello, this is a connection test."}
  82. ],
  83. max_tokens=10
  84. )
  85. return response.choices[0].message.content
  86. except Exception as e:
  87. logger.error(f"LLM Test Error: {e}")
  88. raise e