import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
import yaml
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from application.self_check_service import run_all_checks
from application.uap_message_service import send_self_check_notification
from utils.logger_config import logger
class SelfCheckScheduler:
def __init__(self):
self.scheduler = BackgroundScheduler()
self.scheduler.add_job(self.run_daily_check, 'cron', hour=7, minute=50)
def start(self):
try:
self.scheduler.start()
logger.info("自检定时任务调度器已启动 (每天 07:50)")
except Exception as e:
logger.error(f"启动定时任务失败: {e}")
def load_email_config(self):
try:
with open('email_config.yaml', 'r', encoding='utf-8') as f:
return yaml.safe_load(f).get('email', {})
except Exception as e:
logger.error(f"读取邮件配置失败: {e}")
return {}
def format_report_html(self, results):
"""生成HTML格式的检测报告"""
html = """
🛡️ 系统设备自检报告
生成时间: """ + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + """
"""
total_devices = 0
total_online = 0
total_offline = 0
modules = {
"kodi": "电视系统 (Kodi)",
"door": "门禁系统",
"led": "展品灯座",
"ha": "展厅灯光 (HA)",
"pc": "展厅PC"
}
details_html = ""
for key, name in modules.items():
device_list = results.get(key, [])
module_online = 0
module_total = len(device_list)
rows = ""
for device in device_list:
is_online = device.get('is_online', False)
if is_online:
module_online += 1
total_online += 1
else:
total_offline += 1
total_devices += 1
status_cls = "online" if is_online else "offline"
status_text = "在线" if is_online else "离线"
extra_info = []
if device.get('ip'): extra_info.append(f"IP: {device.get('ip')}")
if device.get('entity_id'): extra_info.append(f"Entity: {device.get('entity_id')}")
if device.get('error'): extra_info.append(f"Error: {device.get('error')}")
rows += f"""
| {device.get('name', 'Unknown')} (ID: {device.get('id')}) |
{status_text} |
{', '.join(extra_info)} |
"""
if not rows:
rows = "| 未配置设备或检测失败 |
"
details_html += f"""
{name} - 在线: {module_online}/{module_total}
"""
html += f"""
总览: 总设备数 {total_devices},在线 {total_online},离线/异常 {total_offline}
{details_html}
"""
return html
def _format_summary_text(self, results):
"""从检测结果生成简短摘要文本"""
total_devices = total_online = total_offline = 0
modules = {"kodi", "door", "led", "ha", "pc"}
for key in modules:
for device in results.get(key, []):
total_devices += 1
if device.get('is_online'):
total_online += 1
else:
total_offline += 1
return f"总设备数 {total_devices},在线 {total_online},离线/异常 {total_offline}"
def send_email(self, subject, content, receivers=None):
config = self.load_email_config()
if not config:
logger.error("缺少邮件配置,取消发送")
return
smtp_server = config.get('smtp_server')
smtp_port = config.get('smtp_port')
username = config.get('username')
password = config.get('password')
sender = config.get('sender')
# 如果未传入接收者,则使用配置中的默认接收者
if not receivers:
receivers = config.get('receivers', [])
# 确保 receivers 是列表
if isinstance(receivers, str):
receivers = [receivers]
use_ssl = config.get('use_ssl', True)
if not all([smtp_server, username, password, sender, receivers]):
logger.error("邮件配置不完整或无接收者")
return
message = MIMEMultipart()
message['From'] = Header(sender, 'utf-8')
message['To'] = Header(",".join(receivers), 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
message.attach(MIMEText(content, 'html', 'utf-8'))
try:
if use_ssl:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
else:
server = smtplib.SMTP(smtp_server, smtp_port)
server.login(username, password)
server.sendmail(sender, receivers, message.as_string())
server.quit()
logger.info(f"自检报告邮件已发送至: {receivers}")
except Exception as e:
logger.error(f"发送邮件失败: {e}")
def run_daily_check(self):
logger.info("开始执行每日自检任务...")
try:
results = run_all_checks()
html_report = self.format_report_html(results)
self.send_email("每日设备自检报告", html_report)
# 向统一登录平台运维人员推送通知
summary = self._format_summary_text(results)
send_self_check_notification(summary)
except Exception as e:
logger.error(f"每日自检任务异常: {e}")
# 单例
scheduler_service = SelfCheckScheduler()
def start_scheduler():
scheduler_service.start()