""" YouTube search and audio download via yt-dlp. Search uses yt-dlp's built-in ytsearch (no API key needed). Downloads extract audio as MP3 using ffmpeg post-processing. """ import asyncio import re import uuid from pathlib import Path from typing import Any import yt_dlp from config import MUSIC_DIR, TEMP_DIR # In-memory store of active download jobs { job_id: { status, progress, ... } } _jobs: dict[str, dict[str, Any]] = {} # ── Search ──────────────────────────────────────────────────────────────────── async def search_youtube(query: str, max_results: int = 5) -> list[dict[str, Any]]: """Return top N YouTube results for a query without downloading anything.""" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": True, "skip_download": True, } search_query = f"ytsearch{max_results}:{query}" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _yt_search, search_query, ydl_opts) def _yt_search(query: str, opts: dict) -> list[dict[str, Any]]: with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(query, download=False) entries = info.get("entries", []) if info else [] results = [] for entry in entries: if not entry: continue video_id = entry.get("id", "") results.append({ "videoId": video_id, "title": entry.get("title", ""), "channel": entry.get("uploader") or entry.get("channel", ""), "duration": _format_duration(entry.get("duration")), "thumbnailUrl": f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg", "watchUrl": f"https://www.youtube.com/watch?v={video_id}", }) return results def _format_duration(seconds: int | None) -> str: if not seconds: return "" m, s = divmod(int(seconds), 60) h, m = divmod(m, 60) if h: return f"{h}:{m:02d}:{s:02d}" return f"{m}:{s:02d}" # ── Download ────────────────────────────────────────────────────────────────── async def start_download( video_id: str, artist: str, album: str, track_name: str, track_number: int | None, year: str, genre: str, artwork_url: str, ) -> str: """ Kick off a background download + tag job. Returns a job_id to poll. Tagging runs inside the same background task after the download completes. """ job_id = str(uuid.uuid4()) _jobs[job_id] = { "status": "queued", "progress": 0, "filename": None, "error": None, } asyncio.create_task( _run_download( job_id, video_id, artist, album, track_name, track_number, year, genre, artwork_url, ) ) return job_id async def _run_download( job_id: str, video_id: str, artist: str, album: str, track_name: str, track_number: int | None, year: str, genre: str, artwork_url: str, ) -> None: import tagger # local import to avoid circular deps _jobs[job_id]["status"] = "downloading" safe_artist = _safe_name(artist) safe_album = _safe_name(album) safe_title = _safe_name(track_name) prefix = f"{track_number:02d} - " if track_number else "" filename_stem = f"{prefix}{safe_title}" dest_dir = MUSIC_DIR / safe_artist / safe_album dest_dir.mkdir(parents=True, exist_ok=True) temp_out = str(TEMP_DIR / f"{job_id}.%(ext)s") final_path = dest_dir / f"{filename_stem}.mp3" ydl_opts = { "quiet": True, "no_warnings": True, "format": "bestaudio/best", "outtmpl": temp_out, "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "0", }], "progress_hooks": [_make_progress_hook(job_id)], } url = f"https://www.youtube.com/watch?v={video_id}" loop = asyncio.get_event_loop() try: await loop.run_in_executor(None, _yt_download, url, ydl_opts) except Exception as exc: _jobs[job_id]["status"] = "error" _jobs[job_id]["error"] = str(exc) return # Move temp file to library temp_mp3 = TEMP_DIR / f"{job_id}.mp3" if not temp_mp3.exists(): _jobs[job_id]["status"] = "error" _jobs[job_id]["error"] = "Converted file not found — is ffmpeg installed?" return temp_mp3.rename(final_path) # Write ID3 tags + embed album art _jobs[job_id]["status"] = "tagging" try: await tagger.tag_file( mp3_path=final_path, track_name=track_name, artist_name=artist, album_name=album, track_number=track_number, year=year, genre=genre, artwork_url=artwork_url, ) except Exception as exc: # Tagging failure is non-fatal — file is still saved _jobs[job_id]["error"] = f"Tagging warning: {exc}" _jobs[job_id]["status"] = "done" _jobs[job_id]["progress"] = 100 _jobs[job_id]["filename"] = str(final_path.relative_to(MUSIC_DIR)) def _yt_download(url: str, opts: dict) -> None: with yt_dlp.YoutubeDL(opts) as ydl: ydl.download([url]) def _make_progress_hook(job_id: str): def hook(d: dict) -> None: if d["status"] == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate", 0) downloaded = d.get("downloaded_bytes", 0) if total: _jobs[job_id]["progress"] = int(downloaded / total * 85) elif d["status"] == "finished": _jobs[job_id]["progress"] = 85 # tagging will push to 100 return hook def get_job_status(job_id: str) -> dict[str, Any] | None: return _jobs.get(job_id) def _safe_name(name: str) -> str: """Remove characters that are illegal in filenames/directory names.""" name = name.strip() name = re.sub(r'[<>:"/\\|?*]', "", name) name = re.sub(r"\s+", " ", name).strip(". ") return name or "Unknown"