music_cache_scanner.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """音乐缓存扫描器 扫描cache/music目录中的音乐文件,提取元数据,生成本地歌单.
  4. 依赖安装: pip install mutagen
  5. """
  6. import hashlib
  7. import json
  8. import sys
  9. from datetime import datetime
  10. from pathlib import Path
  11. from typing import Dict, List, Optional
  12. try:
  13. from mutagen import File as MutagenFile
  14. from mutagen.id3 import ID3NoHeaderError
  15. except ImportError:
  16. print("错误: 需要安装 mutagen 库")
  17. print("请运行: pip install mutagen")
  18. sys.exit(1)
  19. # 项目根目录
  20. PROJECT_ROOT = Path(__file__).parent.parent
  21. class MusicMetadata:
  22. """
  23. 音乐元数据类.
  24. """
  25. def __init__(self, file_path: Path):
  26. self.file_path = file_path
  27. self.filename = file_path.name
  28. self.file_id = file_path.stem # 文件名去掉扩展名,即歌曲ID
  29. self.file_size = file_path.stat().st_size
  30. self.creation_time = datetime.fromtimestamp(file_path.stat().st_ctime)
  31. self.modification_time = datetime.fromtimestamp(file_path.stat().st_mtime)
  32. # 从文件提取的元数据
  33. self.title = None
  34. self.artist = None
  35. self.album = None
  36. self.genre = None
  37. self.year = None
  38. self.duration = None # 秒数
  39. self.bitrate = None
  40. self.sample_rate = None
  41. # 文件哈希(用于去重)
  42. self.file_hash = self._calculate_hash()
  43. def _calculate_hash(self) -> str:
  44. """
  45. 计算文件MD5哈希值(仅前1MB避免大文件计算过慢)
  46. """
  47. try:
  48. hash_md5 = hashlib.md5()
  49. with open(self.file_path, "rb") as f:
  50. # 只读取前1MB计算哈希
  51. chunk = f.read(1024 * 1024)
  52. hash_md5.update(chunk)
  53. return hash_md5.hexdigest()[:16] # 取前16位
  54. except Exception:
  55. return "unknown"
  56. def extract_metadata(self) -> bool:
  57. """
  58. 提取音乐文件元数据.
  59. """
  60. try:
  61. audio_file = MutagenFile(self.file_path)
  62. if audio_file is None:
  63. return False
  64. # 基本信息
  65. if hasattr(audio_file, "info"):
  66. self.duration = getattr(audio_file.info, "length", None)
  67. self.bitrate = getattr(audio_file.info, "bitrate", None)
  68. self.sample_rate = getattr(audio_file.info, "sample_rate", None)
  69. # ID3标签信息
  70. tags = audio_file.tags if audio_file.tags else {}
  71. # 标题
  72. self.title = self._get_tag_value(tags, ["TIT2", "TITLE", "\xa9nam"])
  73. # 艺术家
  74. self.artist = self._get_tag_value(tags, ["TPE1", "ARTIST", "\xa9ART"])
  75. # 专辑
  76. self.album = self._get_tag_value(tags, ["TALB", "ALBUM", "\xa9alb"])
  77. # 流派
  78. self.genre = self._get_tag_value(tags, ["TCON", "GENRE", "\xa9gen"])
  79. # 年份
  80. year_raw = self._get_tag_value(tags, ["TDRC", "DATE", "YEAR", "\xa9day"])
  81. if year_raw:
  82. # 提取年份数字
  83. year_str = str(year_raw)
  84. if year_str.isdigit():
  85. self.year = int(year_str)
  86. else:
  87. # 尝试从日期字符串中提取年份
  88. import re
  89. year_match = re.search(r"(\d{4})", year_str)
  90. if year_match:
  91. self.year = int(year_match.group(1))
  92. return True
  93. except ID3NoHeaderError:
  94. # 没有ID3标签,不是错误
  95. return True
  96. except Exception as e:
  97. print(f"提取元数据失败 {self.filename}: {e}")
  98. return False
  99. def _get_tag_value(self, tags: dict, tag_names: List[str]) -> Optional[str]:
  100. """
  101. 从多个可能的标签名中获取值.
  102. """
  103. for tag_name in tag_names:
  104. if tag_name in tags:
  105. value = tags[tag_name]
  106. if isinstance(value, list) and value:
  107. return str(value[0])
  108. elif value:
  109. return str(value)
  110. return None
  111. def format_duration(self) -> str:
  112. """
  113. 格式化播放时长.
  114. """
  115. if self.duration is None:
  116. return "未知"
  117. minutes = int(self.duration) // 60
  118. seconds = int(self.duration) % 60
  119. return f"{minutes:02d}:{seconds:02d}"
  120. def format_file_size(self) -> str:
  121. """
  122. 格式化文件大小.
  123. """
  124. size = self.file_size
  125. for unit in ["B", "KB", "MB", "GB"]:
  126. if size < 1024.0:
  127. return f"{size:.1f} {unit}"
  128. size /= 1024.0
  129. return f"{size:.1f} TB"
  130. def to_dict(self) -> Dict:
  131. """
  132. 转换为字典格式.
  133. """
  134. return {
  135. "file_id": self.file_id,
  136. "filename": self.filename,
  137. "title": self.title,
  138. "artist": self.artist,
  139. "album": self.album,
  140. "genre": self.genre,
  141. "year": self.year,
  142. "duration": self.duration,
  143. "duration_formatted": self.format_duration(),
  144. "bitrate": self.bitrate,
  145. "sample_rate": self.sample_rate,
  146. "file_size": self.file_size,
  147. "file_size_formatted": self.format_file_size(),
  148. "file_hash": self.file_hash,
  149. "creation_time": self.creation_time.isoformat(),
  150. "modification_time": self.modification_time.isoformat(),
  151. }
  152. class MusicCacheScanner:
  153. """
  154. 音乐缓存扫描器.
  155. """
  156. def __init__(self, cache_dir: Path = None):
  157. self.cache_dir = cache_dir or PROJECT_ROOT / "cache" / "music"
  158. self.playlist: List[MusicMetadata] = []
  159. self.scan_stats = {
  160. "total_files": 0,
  161. "success_count": 0,
  162. "error_count": 0,
  163. "total_duration": 0,
  164. "total_size": 0,
  165. }
  166. def scan_cache(self) -> bool:
  167. """
  168. 扫描缓存目录.
  169. """
  170. print(f"🎵 开始扫描音乐缓存目录: {self.cache_dir}")
  171. if not self.cache_dir.exists():
  172. print(f"❌ 缓存目录不存在: {self.cache_dir}")
  173. return False
  174. # 查找所有音乐文件
  175. music_files = []
  176. for pattern in ["*.mp3", "*.m4a", "*.flac", "*.wav", "*.ogg"]:
  177. music_files.extend(self.cache_dir.glob(pattern))
  178. if not music_files:
  179. print("📁 缓存目录中没有找到音乐文件")
  180. return False
  181. self.scan_stats["total_files"] = len(music_files)
  182. print(f"📊 找到 {len(music_files)} 个音乐文件")
  183. # 扫描每个文件
  184. for i, file_path in enumerate(music_files, 1):
  185. print(f"🔍 [{i}/{len(music_files)}] 扫描: {file_path.name}")
  186. try:
  187. metadata = MusicMetadata(file_path)
  188. if metadata.extract_metadata():
  189. self.playlist.append(metadata)
  190. self.scan_stats["success_count"] += 1
  191. # 累计统计
  192. if metadata.duration:
  193. self.scan_stats["total_duration"] += metadata.duration
  194. self.scan_stats["total_size"] += metadata.file_size
  195. # 显示基本信息
  196. display_title = metadata.title or "未知标题"
  197. display_artist = metadata.artist or "未知艺术家"
  198. print(
  199. f" ✅ {display_title} - {display_artist} ({metadata.format_duration()})"
  200. )
  201. else:
  202. self.scan_stats["error_count"] += 1
  203. print(" ❌ 元数据提取失败")
  204. except Exception as e:
  205. self.scan_stats["error_count"] += 1
  206. print(f" ❌ 处理失败: {e}")
  207. return True
  208. def remove_duplicates(self):
  209. """
  210. 移除重复的音乐文件(基于哈希值)
  211. """
  212. seen_hashes = set()
  213. unique_playlist = []
  214. duplicates = []
  215. for metadata in self.playlist:
  216. if metadata.file_hash in seen_hashes:
  217. duplicates.append(metadata)
  218. else:
  219. seen_hashes.add(metadata.file_hash)
  220. unique_playlist.append(metadata)
  221. if duplicates:
  222. print(f"🔄 发现 {len(duplicates)} 个重复文件:")
  223. for dup in duplicates:
  224. print(f" - {dup.filename}")
  225. self.playlist = unique_playlist
  226. def sort_playlist(self, sort_by: str = "artist"):
  227. """
  228. 排序歌单.
  229. """
  230. sort_functions = {
  231. "artist": lambda x: (
  232. x.artist or "Unknown",
  233. x.album or "Unknown",
  234. x.title or "Unknown",
  235. ),
  236. "title": lambda x: x.title or "Unknown",
  237. "album": lambda x: (x.album or "Unknown", x.artist or "Unknown"),
  238. "duration": lambda x: x.duration or 0,
  239. "file_size": lambda x: x.file_size,
  240. "creation_time": lambda x: x.creation_time,
  241. }
  242. if sort_by in sort_functions:
  243. self.playlist.sort(key=sort_functions[sort_by])
  244. print(f"📋 歌单已按 {sort_by} 排序")
  245. def print_statistics(self):
  246. """
  247. 打印扫描统计信息.
  248. """
  249. stats = self.scan_stats
  250. print("\n📊 扫描统计:")
  251. print(f" 总文件数: {stats['total_files']}")
  252. print(f" 成功处理: {stats['success_count']}")
  253. print(f" 处理失败: {stats['error_count']}")
  254. print(f" 成功率: {stats['success_count']/stats['total_files']*100:.1f}%")
  255. # 总时长
  256. total_hours = stats["total_duration"] // 3600
  257. total_minutes = (stats["total_duration"] % 3600) // 60
  258. print(f" 总播放时长: {total_hours}小时{total_minutes}分钟")
  259. # 总大小
  260. total_size_mb = stats["total_size"] / (1024 * 1024)
  261. print(f" 总文件大小: {total_size_mb:.1f} MB")
  262. # 平均信息
  263. if stats["success_count"] > 0:
  264. avg_duration = stats["total_duration"] / stats["success_count"]
  265. avg_size = stats["total_size"] / stats["success_count"]
  266. print(f" 平均时长: {int(avg_duration//60)}:{int(avg_duration%60):02d}")
  267. print(f" 平均大小: {avg_size/(1024*1024):.1f} MB")
  268. def print_playlist(self, limit: int = None):
  269. """
  270. 打印歌单.
  271. """
  272. print(f"\n🎵 本地音乐歌单 (共 {len(self.playlist)} 首)")
  273. print("=" * 80)
  274. for i, metadata in enumerate(
  275. self.playlist[:limit] if limit else self.playlist, 1
  276. ):
  277. title = metadata.title or "未知标题"
  278. artist = metadata.artist or "未知艺术家"
  279. album = metadata.album or "未知专辑"
  280. duration = metadata.format_duration()
  281. print(f"{i:3d}. {title}")
  282. print(f" 艺术家: {artist}")
  283. print(f" 专辑: {album}")
  284. print(f" 时长: {duration} | 文件ID: {metadata.file_id}")
  285. print()
  286. if limit and len(self.playlist) > limit:
  287. print(f"... 还有 {len(self.playlist) - limit} 首歌曲")
  288. def export_playlist(self, output_file: Path = None, format: str = "json"):
  289. """
  290. 导出歌单.
  291. """
  292. if not output_file:
  293. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  294. output_file = PROJECT_ROOT / f"local_playlist_{timestamp}.{format}"
  295. try:
  296. if format == "json":
  297. playlist_data = {
  298. "metadata": {
  299. "generated_at": datetime.now().isoformat(),
  300. "cache_directory": str(self.cache_dir),
  301. "total_songs": len(self.playlist),
  302. "statistics": self.scan_stats,
  303. },
  304. "playlist": [metadata.to_dict() for metadata in self.playlist],
  305. }
  306. with open(output_file, "w", encoding="utf-8") as f:
  307. json.dump(playlist_data, f, ensure_ascii=False, indent=2)
  308. elif format == "m3u":
  309. with open(output_file, "w", encoding="utf-8") as f:
  310. f.write("#EXTM3U\n")
  311. for metadata in self.playlist:
  312. title = metadata.title or metadata.filename
  313. artist = metadata.artist or "Unknown Artist"
  314. duration = int(metadata.duration) if metadata.duration else -1
  315. f.write(f"#EXTINF:{duration},{artist} - {title}\n")
  316. f.write(f"{metadata.file_path}\n")
  317. print(f"📄 歌单已导出到: {output_file}")
  318. return output_file
  319. except Exception as e:
  320. print(f"❌ 导出失败: {e}")
  321. return None
  322. def search_songs(self, query: str) -> List[MusicMetadata]:
  323. """
  324. 搜索歌曲.
  325. """
  326. query = query.lower()
  327. results = []
  328. for metadata in self.playlist:
  329. # 在标题、艺术家、专辑中搜索
  330. searchable_text = " ".join(
  331. filter(
  332. None,
  333. [
  334. metadata.title,
  335. metadata.artist,
  336. metadata.album,
  337. metadata.filename,
  338. ],
  339. )
  340. ).lower()
  341. if query in searchable_text:
  342. results.append(metadata)
  343. return results
  344. def get_artists(self) -> Dict[str, List[MusicMetadata]]:
  345. """
  346. 按艺术家分组.
  347. """
  348. artists = {}
  349. for metadata in self.playlist:
  350. artist = metadata.artist or "未知艺术家"
  351. if artist not in artists:
  352. artists[artist] = []
  353. artists[artist].append(metadata)
  354. return artists
  355. def get_albums(self) -> Dict[str, List[MusicMetadata]]:
  356. """
  357. 按专辑分组.
  358. """
  359. albums = {}
  360. for metadata in self.playlist:
  361. album_key = (
  362. f"{metadata.album or '未知专辑'} - {metadata.artist or '未知艺术家'}"
  363. )
  364. if album_key not in albums:
  365. albums[album_key] = []
  366. albums[album_key].append(metadata)
  367. return albums
  368. def main():
  369. """
  370. 主函数.
  371. """
  372. print("🎵 音乐缓存扫描器")
  373. print("=" * 50)
  374. # 创建扫描器
  375. scanner = MusicCacheScanner()
  376. # 扫描缓存
  377. if not scanner.scan_cache():
  378. return
  379. # 移除重复文件
  380. scanner.remove_duplicates()
  381. # 排序歌单
  382. scanner.sort_playlist("artist")
  383. # 显示统计信息
  384. scanner.print_statistics()
  385. # 显示歌单(限制前20首)
  386. scanner.print_playlist(limit=20)
  387. # 交互菜单
  388. while True:
  389. print("\n" + "=" * 50)
  390. print("选择操作:")
  391. print("1. 显示完整歌单")
  392. print("2. 按艺术家分组显示")
  393. print("3. 按专辑分组显示")
  394. print("4. 搜索歌曲")
  395. print("5. 导出歌单 (JSON)")
  396. print("6. 导出歌单 (M3U)")
  397. print("7. 重新排序")
  398. print("0. 退出")
  399. choice = input("\n请选择 (0-7): ").strip()
  400. if choice == "0":
  401. break
  402. elif choice == "1":
  403. scanner.print_playlist()
  404. elif choice == "2":
  405. artists = scanner.get_artists()
  406. for artist, songs in artists.items():
  407. print(f"\n🎤 {artist} ({len(songs)} 首)")
  408. for song in songs:
  409. title = song.title or song.filename
  410. print(f" - {title} ({song.format_duration()})")
  411. elif choice == "3":
  412. albums = scanner.get_albums()
  413. for album, songs in albums.items():
  414. print(f"\n💿 {album} ({len(songs)} 首)")
  415. for song in songs:
  416. title = song.title or song.filename
  417. print(f" - {title} ({song.format_duration()})")
  418. elif choice == "4":
  419. query = input("请输入搜索关键词: ").strip()
  420. if query:
  421. results = scanner.search_songs(query)
  422. if results:
  423. print(f"\n🔍 找到 {len(results)} 首歌曲:")
  424. for i, song in enumerate(results, 1):
  425. title = song.title or song.filename
  426. artist = song.artist or "未知艺术家"
  427. print(f" {i}. {title} - {artist} ({song.format_duration()})")
  428. else:
  429. print("🔍 没有找到匹配的歌曲")
  430. elif choice == "5":
  431. scanner.export_playlist(format="json")
  432. elif choice == "6":
  433. scanner.export_playlist(format="m3u")
  434. elif choice == "7":
  435. print("排序选项:")
  436. print("1. 按艺术家")
  437. print("2. 按标题")
  438. print("3. 按专辑")
  439. print("4. 按时长")
  440. print("5. 按文件大小")
  441. print("6. 按创建时间")
  442. sort_choice = input("请选择排序方式 (1-6): ").strip()
  443. sort_map = {
  444. "1": "artist",
  445. "2": "title",
  446. "3": "album",
  447. "4": "duration",
  448. "5": "file_size",
  449. "6": "creation_time",
  450. }
  451. if sort_choice in sort_map:
  452. scanner.sort_playlist(sort_map[sort_choice])
  453. print("✅ 排序完成")
  454. else:
  455. print("❌ 无效选择")
  456. print("\n👋 再见!")
  457. if __name__ == "__main__":
  458. try:
  459. main()
  460. except KeyboardInterrupt:
  461. print("\n\n👋 用户中断,退出程序")
  462. except Exception as e:
  463. print(f"\n❌ 程序异常: {e}")
  464. import traceback
  465. traceback.print_exc()