Initial commit, prod ready

This commit is contained in:
2026-02-22 23:18:57 -05:00
commit 0d03be47a0
41 changed files with 5623 additions and 0 deletions

3
backend/.env Normal file
View File

@@ -0,0 +1,3 @@
# Path to your Plex music library folder
# Change this to wherever Plex reads music from on your server
MUSIC_DIR=./Music

5
backend/.env.example Normal file
View File

@@ -0,0 +1,5 @@
# Path to your Plex music library folder
MUSIC_DIR=/mnt/media/Music
# Optional: restrict CORS to your frontend URL in production
# FRONTEND_URL=http://your-server-ip:5173

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

13
backend/config.py Normal file
View File

@@ -0,0 +1,13 @@
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
# Where downloaded music is stored — must match your Plex library path
MUSIC_DIR = Path(os.getenv("MUSIC_DIR", "./Music")).resolve()
MUSIC_DIR.mkdir(parents=True, exist_ok=True)
# Temp working directory for in-progress downloads
TEMP_DIR = Path(os.getenv("TEMP_DIR", "./tmp")).resolve()
TEMP_DIR.mkdir(parents=True, exist_ok=True)

108
backend/library.py Normal file
View File

@@ -0,0 +1,108 @@
"""
Music library helpers — read folder tree, delete files/folders.
File serving is handled directly in main.py via FastAPI's FileResponse.
"""
import shutil
from pathlib import Path
from typing import Any
from config import MUSIC_DIR
def get_library() -> list[dict[str, Any]]:
"""
Walk the Music directory and return a nested structure:
[
{
"artist": "Queen",
"albums": [
{
"album": "A Night at the Opera",
"tracks": [
{ "filename": "01 - Bohemian Rhapsody.mp3", "path": "Queen/A Night at the Opera/01 - Bohemian Rhapsody.mp3" }
]
}
]
}
]
"""
artists: list[dict[str, Any]] = []
if not MUSIC_DIR.exists():
return artists
for artist_dir in sorted(MUSIC_DIR.iterdir()):
if not artist_dir.is_dir():
continue
albums: list[dict[str, Any]] = []
for album_dir in sorted(artist_dir.iterdir()):
if not album_dir.is_dir():
continue
tracks = []
for track_file in sorted(album_dir.iterdir()):
if track_file.suffix.lower() not in {".mp3", ".flac", ".m4a", ".ogg"}:
continue
rel = track_file.relative_to(MUSIC_DIR)
tracks.append({
"filename": track_file.name,
"path": rel.as_posix(),
"size": track_file.stat().st_size,
})
if tracks:
albums.append({"album": album_dir.name, "tracks": tracks})
if albums:
artists.append({"artist": artist_dir.name, "albums": albums})
return artists
def delete_path(rel_path: str) -> None:
"""
Delete a file or directory relative to MUSIC_DIR.
Raises ValueError if the resolved path escapes MUSIC_DIR (path traversal guard).
"""
target = (MUSIC_DIR / rel_path).resolve()
# Security: ensure target is inside MUSIC_DIR
if not str(target).startswith(str(MUSIC_DIR)):
raise ValueError("Path traversal detected.")
if not target.exists():
raise FileNotFoundError(f"Not found: {rel_path}")
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
# Clean up empty parent directories (album → artist)
_remove_empty_parents(target.parent)
def _remove_empty_parents(directory: Path) -> None:
"""Walk up and remove directories that are now empty, stopping at MUSIC_DIR."""
current = directory
while current != MUSIC_DIR and current.is_dir():
if not any(current.iterdir()):
current.rmdir()
current = current.parent
else:
break
def resolve_track_path(rel_path: str) -> Path:
"""
Resolve a relative path (e.g. 'Artist/Album/track.mp3') to an absolute path
inside MUSIC_DIR. Raises ValueError on path traversal.
"""
target = (MUSIC_DIR / rel_path).resolve()
if not str(target).startswith(str(MUSIC_DIR)):
raise ValueError("Path traversal detected.")
if not target.exists():
raise FileNotFoundError(f"Not found: {rel_path}")
return target

133
backend/main.py Normal file
View File

@@ -0,0 +1,133 @@
"""
Music Orchestrator — FastAPI backend
"""
from typing import Any
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
import search as music_search
import youtube
import library as lib
# In production, nginx sits in front and serves the frontend statically,
# proxying /api/* to this process on 127.0.0.1:8000. CORS is not needed
# because all requests appear same-origin to the browser.
# In dev, Vite's proxy does the same job. No CORS middleware required.
app = FastAPI(title="Music Orchestrator", docs_url=None, redoc_url=None)
# ── Request / Response models ─────────────────────────────────────────────────
class DownloadRequest(BaseModel):
videoId: str
trackName: str
artistName: str
albumName: str
trackNumber: int | None = None
year: str = ""
genre: str = ""
artworkUrl: str = ""
# ── Music search ──────────────────────────────────────────────────────────────
@app.get("/api/search/music")
async def search_music(q: str = Query(..., min_length=1)) -> list[dict[str, Any]]:
"""Search iTunes for song metadata."""
try:
return await music_search.search_music(q)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"iTunes search failed: {exc}")
# ── YouTube search ────────────────────────────────────────────────────────────
@app.get("/api/search/youtube")
async def search_youtube(q: str = Query(..., min_length=1)) -> list[dict[str, Any]]:
"""Search YouTube and return top 5 results (no download)."""
try:
return await youtube.search_youtube(q)
except Exception as exc:
raise HTTPException(status_code=502, detail=f"YouTube search failed: {exc}")
# ── Download ──────────────────────────────────────────────────────────────────
@app.post("/api/download")
async def start_download(req: DownloadRequest) -> dict[str, str]:
"""
Start an async download + tag job. Returns a job_id to poll.
Metadata comes from iTunes (req body), not from the YouTube video.
"""
job_id = await youtube.start_download(
video_id=req.videoId,
artist=req.artistName,
album=req.albumName,
track_name=req.trackName,
track_number=req.trackNumber,
year=req.year,
genre=req.genre,
artwork_url=req.artworkUrl,
)
return {"jobId": job_id}
@app.get("/api/download/{job_id}/status")
def download_status(job_id: str) -> dict[str, Any]:
"""Poll download job status. Returns { status, progress, filename, error }."""
job = youtube.get_job_status(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found.")
return {
"status": job["status"],
"progress": job["progress"],
"filename": job.get("filename"),
"error": job.get("error"),
}
# ── Library ───────────────────────────────────────────────────────────────────
@app.get("/api/library")
def get_library() -> list[dict[str, Any]]:
"""Return the full nested artist → album → tracks structure."""
return lib.get_library()
@app.delete("/api/library")
def delete_library_item(path: str = Query(...)) -> dict[str, str]:
"""
Delete a file or folder inside the music library.
`path` is relative to MUSIC_DIR, e.g. 'Artist/Album/01 - Song.mp3'
"""
try:
lib.delete_path(path)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc))
return {"deleted": path}
@app.get("/api/library/download")
def download_track(path: str = Query(...)) -> FileResponse:
"""
Stream a file from the music library to the browser as a download.
`path` is relative to MUSIC_DIR.
"""
try:
abs_path = lib.resolve_track_path(path)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except FileNotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc))
return FileResponse(
path=str(abs_path),
media_type="audio/mpeg",
filename=abs_path.name,
headers={"Content-Disposition": f'attachment; filename="{abs_path.name}"'},
)

8
backend/requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
yt-dlp==2025.1.26
mutagen==1.47.0
httpx==0.28.1
python-multipart==0.0.20
python-dotenv==1.0.1
aiofiles==24.1.0

45
backend/search.py Normal file
View File

@@ -0,0 +1,45 @@
"""
Music metadata search via the iTunes Search API.
Free, no auth, returns track info + album art URLs.
"""
import httpx
from typing import Any
ITUNES_URL = "https://itunes.apple.com/search"
async def search_music(query: str, limit: int = 10) -> list[dict[str, Any]]:
params = {
"term": query,
"media": "music",
"entity": "song",
"limit": limit,
}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(ITUNES_URL, params=params)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("results", []):
# Upgrade artwork URL to 600x600 (iTunes returns 100x100 by default)
art_url = item.get("artworkUrl100", "")
if art_url:
art_url = art_url.replace("100x100bb", "600x600bb")
results.append({
"trackId": item.get("trackId"),
"trackName": item.get("trackName", ""),
"artistName": item.get("artistName", ""),
"collectionName": item.get("collectionName", ""),
"trackNumber": item.get("trackNumber"),
"discNumber": item.get("discNumber"),
"releaseDate": item.get("releaseDate", "")[:4], # just the year
"genre": item.get("primaryGenreName", ""),
"artworkUrl": art_url,
"previewUrl": item.get("previewUrl", ""),
})
return results

65
backend/tagger.py Normal file
View File

@@ -0,0 +1,65 @@
"""
Embeds ID3 metadata into a downloaded MP3 using mutagen.
Fetches album art from the iTunes artwork URL and embeds it as APIC.
"""
import httpx
from pathlib import Path
from mutagen.id3 import (
ID3, ID3NoHeaderError,
TIT2, TPE1, TALB, TRCK, TDRC, TCON, APIC,
)
async def tag_file(
mp3_path: Path,
track_name: str,
artist_name: str,
album_name: str,
track_number: int | None,
year: str,
genre: str,
artwork_url: str,
) -> None:
"""Write ID3v2 tags to mp3_path. Fetches artwork from artwork_url."""
# Load or create ID3 header
try:
tags = ID3(str(mp3_path))
except ID3NoHeaderError:
tags = ID3()
tags.delall("TIT2"); tags["TIT2"] = TIT2(encoding=3, text=track_name)
tags.delall("TPE1"); tags["TPE1"] = TPE1(encoding=3, text=artist_name)
tags.delall("TALB"); tags["TALB"] = TALB(encoding=3, text=album_name)
tags.delall("TDRC"); tags["TDRC"] = TDRC(encoding=3, text=year)
tags.delall("TCON"); tags["TCON"] = TCON(encoding=3, text=genre)
if track_number is not None:
tags.delall("TRCK")
tags["TRCK"] = TRCK(encoding=3, text=str(track_number))
# Fetch and embed album art
if artwork_url:
art_data = await _fetch_artwork(artwork_url)
if art_data:
tags.delall("APIC")
tags["APIC"] = APIC(
encoding=3,
mime="image/jpeg",
type=3, # front cover
desc="Cover",
data=art_data,
)
tags.save(str(mp3_path), v2_version=3) # ID3v2.3 for broadest device compatibility
async def _fetch_artwork(url: str) -> bytes | None:
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.get(url)
r.raise_for_status()
return r.content
except Exception:
return None

207
backend/youtube.py Normal file
View File

@@ -0,0 +1,207 @@
"""
YouTube search and audio download via yt-dlp.
Search uses yt-dlp's built-in ytsearch (no API key needed).
Downloads extract audio as MP3 using ffmpeg post-processing.
"""
import asyncio
import re
import uuid
from pathlib import Path
from typing import Any
import yt_dlp
from config import MUSIC_DIR, TEMP_DIR
# In-memory store of active download jobs { job_id: { status, progress, ... } }
_jobs: dict[str, dict[str, Any]] = {}
# ── Search ────────────────────────────────────────────────────────────────────
async def search_youtube(query: str, max_results: int = 5) -> list[dict[str, Any]]:
"""Return top N YouTube results for a query without downloading anything."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True,
"skip_download": True,
}
search_query = f"ytsearch{max_results}:{query}"
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _yt_search, search_query, ydl_opts)
def _yt_search(query: str, opts: dict) -> list[dict[str, Any]]:
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(query, download=False)
entries = info.get("entries", []) if info else []
results = []
for entry in entries:
if not entry:
continue
video_id = entry.get("id", "")
results.append({
"videoId": video_id,
"title": entry.get("title", ""),
"channel": entry.get("uploader") or entry.get("channel", ""),
"duration": _format_duration(entry.get("duration")),
"thumbnailUrl": f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
"watchUrl": f"https://www.youtube.com/watch?v={video_id}",
})
return results
def _format_duration(seconds: int | None) -> str:
if not seconds:
return ""
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
# ── Download ──────────────────────────────────────────────────────────────────
async def start_download(
video_id: str,
artist: str,
album: str,
track_name: str,
track_number: int | None,
year: str,
genre: str,
artwork_url: str,
) -> str:
"""
Kick off a background download + tag job. Returns a job_id to poll.
Tagging runs inside the same background task after the download completes.
"""
job_id = str(uuid.uuid4())
_jobs[job_id] = {
"status": "queued",
"progress": 0,
"filename": None,
"error": None,
}
asyncio.create_task(
_run_download(
job_id, video_id, artist, album, track_name,
track_number, year, genre, artwork_url,
)
)
return job_id
async def _run_download(
job_id: str,
video_id: str,
artist: str,
album: str,
track_name: str,
track_number: int | None,
year: str,
genre: str,
artwork_url: str,
) -> None:
import tagger # local import to avoid circular deps
_jobs[job_id]["status"] = "downloading"
safe_artist = _safe_name(artist)
safe_album = _safe_name(album)
safe_title = _safe_name(track_name)
prefix = f"{track_number:02d} - " if track_number else ""
filename_stem = f"{prefix}{safe_title}"
dest_dir = MUSIC_DIR / safe_artist / safe_album
dest_dir.mkdir(parents=True, exist_ok=True)
temp_out = str(TEMP_DIR / f"{job_id}.%(ext)s")
final_path = dest_dir / f"{filename_stem}.mp3"
ydl_opts = {
"quiet": True,
"no_warnings": True,
"format": "bestaudio/best",
"outtmpl": temp_out,
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "0",
}],
"progress_hooks": [_make_progress_hook(job_id)],
}
url = f"https://www.youtube.com/watch?v={video_id}"
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, _yt_download, url, ydl_opts)
except Exception as exc:
_jobs[job_id]["status"] = "error"
_jobs[job_id]["error"] = str(exc)
return
# Move temp file to library
temp_mp3 = TEMP_DIR / f"{job_id}.mp3"
if not temp_mp3.exists():
_jobs[job_id]["status"] = "error"
_jobs[job_id]["error"] = "Converted file not found — is ffmpeg installed?"
return
temp_mp3.rename(final_path)
# Write ID3 tags + embed album art
_jobs[job_id]["status"] = "tagging"
try:
await tagger.tag_file(
mp3_path=final_path,
track_name=track_name,
artist_name=artist,
album_name=album,
track_number=track_number,
year=year,
genre=genre,
artwork_url=artwork_url,
)
except Exception as exc:
# Tagging failure is non-fatal — file is still saved
_jobs[job_id]["error"] = f"Tagging warning: {exc}"
_jobs[job_id]["status"] = "done"
_jobs[job_id]["progress"] = 100
_jobs[job_id]["filename"] = str(final_path.relative_to(MUSIC_DIR))
def _yt_download(url: str, opts: dict) -> None:
with yt_dlp.YoutubeDL(opts) as ydl:
ydl.download([url])
def _make_progress_hook(job_id: str):
def hook(d: dict) -> None:
if d["status"] == "downloading":
total = d.get("total_bytes") or d.get("total_bytes_estimate", 0)
downloaded = d.get("downloaded_bytes", 0)
if total:
_jobs[job_id]["progress"] = int(downloaded / total * 85)
elif d["status"] == "finished":
_jobs[job_id]["progress"] = 85 # tagging will push to 100
return hook
def get_job_status(job_id: str) -> dict[str, Any] | None:
return _jobs.get(job_id)
def _safe_name(name: str) -> str:
"""Remove characters that are illegal in filenames/directory names."""
name = name.strip()
name = re.sub(r'[<>:"/\\|?*]', "", name)
name = re.sub(r"\s+", " ", name).strip(". ")
return name or "Unknown"