208 lines
6.2 KiB
Python
208 lines
6.2 KiB
Python
"""
|
|
YouTube search and audio download via yt-dlp.
|
|
Search uses yt-dlp's built-in ytsearch (no API key needed).
|
|
Downloads extract audio as MP3 using ffmpeg post-processing.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yt_dlp
|
|
|
|
from config import MUSIC_DIR, TEMP_DIR
|
|
|
|
# In-memory store of active download jobs { job_id: { status, progress, ... } }
|
|
_jobs: dict[str, dict[str, Any]] = {}
|
|
|
|
|
|
# ── Search ────────────────────────────────────────────────────────────────────
|
|
|
|
async def search_youtube(query: str, max_results: int = 5) -> list[dict[str, Any]]:
|
|
"""Return top N YouTube results for a query without downloading anything."""
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"extract_flat": True,
|
|
"skip_download": True,
|
|
}
|
|
search_query = f"ytsearch{max_results}:{query}"
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, _yt_search, search_query, ydl_opts)
|
|
|
|
|
|
def _yt_search(query: str, opts: dict) -> list[dict[str, Any]]:
|
|
with yt_dlp.YoutubeDL(opts) as ydl:
|
|
info = ydl.extract_info(query, download=False)
|
|
|
|
entries = info.get("entries", []) if info else []
|
|
results = []
|
|
for entry in entries:
|
|
if not entry:
|
|
continue
|
|
video_id = entry.get("id", "")
|
|
results.append({
|
|
"videoId": video_id,
|
|
"title": entry.get("title", ""),
|
|
"channel": entry.get("uploader") or entry.get("channel", ""),
|
|
"duration": _format_duration(entry.get("duration")),
|
|
"thumbnailUrl": f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
|
|
"watchUrl": f"https://www.youtube.com/watch?v={video_id}",
|
|
})
|
|
return results
|
|
|
|
|
|
def _format_duration(seconds: int | None) -> str:
|
|
if not seconds:
|
|
return ""
|
|
m, s = divmod(int(seconds), 60)
|
|
h, m = divmod(m, 60)
|
|
if h:
|
|
return f"{h}:{m:02d}:{s:02d}"
|
|
return f"{m}:{s:02d}"
|
|
|
|
|
|
# ── Download ──────────────────────────────────────────────────────────────────
|
|
|
|
async def start_download(
|
|
video_id: str,
|
|
artist: str,
|
|
album: str,
|
|
track_name: str,
|
|
track_number: int | None,
|
|
year: str,
|
|
genre: str,
|
|
artwork_url: str,
|
|
) -> str:
|
|
"""
|
|
Kick off a background download + tag job. Returns a job_id to poll.
|
|
Tagging runs inside the same background task after the download completes.
|
|
"""
|
|
job_id = str(uuid.uuid4())
|
|
_jobs[job_id] = {
|
|
"status": "queued",
|
|
"progress": 0,
|
|
"filename": None,
|
|
"error": None,
|
|
}
|
|
|
|
asyncio.create_task(
|
|
_run_download(
|
|
job_id, video_id, artist, album, track_name,
|
|
track_number, year, genre, artwork_url,
|
|
)
|
|
)
|
|
return job_id
|
|
|
|
|
|
async def _run_download(
|
|
job_id: str,
|
|
video_id: str,
|
|
artist: str,
|
|
album: str,
|
|
track_name: str,
|
|
track_number: int | None,
|
|
year: str,
|
|
genre: str,
|
|
artwork_url: str,
|
|
) -> None:
|
|
import tagger # local import to avoid circular deps
|
|
|
|
_jobs[job_id]["status"] = "downloading"
|
|
|
|
safe_artist = _safe_name(artist)
|
|
safe_album = _safe_name(album)
|
|
safe_title = _safe_name(track_name)
|
|
prefix = f"{track_number:02d} - " if track_number else ""
|
|
filename_stem = f"{prefix}{safe_title}"
|
|
|
|
dest_dir = MUSIC_DIR / safe_artist / safe_album
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
temp_out = str(TEMP_DIR / f"{job_id}.%(ext)s")
|
|
final_path = dest_dir / f"{filename_stem}.mp3"
|
|
|
|
ydl_opts = {
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"format": "bestaudio/best",
|
|
"outtmpl": temp_out,
|
|
"postprocessors": [{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
"preferredquality": "0",
|
|
}],
|
|
"progress_hooks": [_make_progress_hook(job_id)],
|
|
}
|
|
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
await loop.run_in_executor(None, _yt_download, url, ydl_opts)
|
|
except Exception as exc:
|
|
_jobs[job_id]["status"] = "error"
|
|
_jobs[job_id]["error"] = str(exc)
|
|
return
|
|
|
|
# Move temp file to library
|
|
temp_mp3 = TEMP_DIR / f"{job_id}.mp3"
|
|
if not temp_mp3.exists():
|
|
_jobs[job_id]["status"] = "error"
|
|
_jobs[job_id]["error"] = "Converted file not found — is ffmpeg installed?"
|
|
return
|
|
|
|
temp_mp3.rename(final_path)
|
|
|
|
# Write ID3 tags + embed album art
|
|
_jobs[job_id]["status"] = "tagging"
|
|
try:
|
|
await tagger.tag_file(
|
|
mp3_path=final_path,
|
|
track_name=track_name,
|
|
artist_name=artist,
|
|
album_name=album,
|
|
track_number=track_number,
|
|
year=year,
|
|
genre=genre,
|
|
artwork_url=artwork_url,
|
|
)
|
|
except Exception as exc:
|
|
# Tagging failure is non-fatal — file is still saved
|
|
_jobs[job_id]["error"] = f"Tagging warning: {exc}"
|
|
|
|
_jobs[job_id]["status"] = "done"
|
|
_jobs[job_id]["progress"] = 100
|
|
_jobs[job_id]["filename"] = str(final_path.relative_to(MUSIC_DIR))
|
|
|
|
|
|
def _yt_download(url: str, opts: dict) -> None:
|
|
with yt_dlp.YoutubeDL(opts) as ydl:
|
|
ydl.download([url])
|
|
|
|
|
|
def _make_progress_hook(job_id: str):
|
|
def hook(d: dict) -> None:
|
|
if d["status"] == "downloading":
|
|
total = d.get("total_bytes") or d.get("total_bytes_estimate", 0)
|
|
downloaded = d.get("downloaded_bytes", 0)
|
|
if total:
|
|
_jobs[job_id]["progress"] = int(downloaded / total * 85)
|
|
elif d["status"] == "finished":
|
|
_jobs[job_id]["progress"] = 85 # tagging will push to 100
|
|
return hook
|
|
|
|
|
|
def get_job_status(job_id: str) -> dict[str, Any] | None:
|
|
return _jobs.get(job_id)
|
|
|
|
|
|
def _safe_name(name: str) -> str:
|
|
"""Remove characters that are illegal in filenames/directory names."""
|
|
name = name.strip()
|
|
name = re.sub(r'[<>:"/\\|?*]', "", name)
|
|
name = re.sub(r"\s+", " ", name).strip(". ")
|
|
return name or "Unknown"
|