Add Apple Music downloader core modules

This commit is contained in:
Rafael Moraes
2025-10-21 12:51:30 -03:00
parent 4841e0f356
commit c51dbf0e8b
7 changed files with 867 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
DEFAULT_SONG_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
TEMP_PATH_TEMPLATE = "gamdl_temp_{}"
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
+120
View File
@@ -0,0 +1,120 @@
import asyncio
from ..utils import safe_gather
from .downloader_base import AppleMusicBaseDownloader
from .downloader_song import AppleMusicSongDownloader
from .types import DownloadItem
from pathlib import Path
from .exceptions import (
MediaNotStreamableError,
MediaFormatNotAvailableError,
MediaFileAlreadyExistsError,
)
class AppleMusicDownloader:
def __init__(
self,
base_downloader: AppleMusicBaseDownloader,
song_downloader: AppleMusicSongDownloader,
):
self.base_downloader = base_downloader
self.song_downloader = song_downloader
async def get_single_download_item(
self,
media_metadata: dict,
) -> DownloadItem:
download_item = None
if media_metadata["type"] in {"songs", "library-songs"}:
download_item = await self.song_downloader.get_download_item(
media_metadata,
)
return download_item
async def get_album_download_items(
self,
album_metadata: dict,
) -> list[DownloadItem]:
tasks = []
for media_metadata in album_metadata["relationships"]["tracks"]["data"]:
if media_metadata["type"] in {"songs", "library-songs"}:
tasks.append(
asyncio.create_task(
self.song_downloader.get_download_item(
media_metadata,
)
)
)
download_items = await safe_gather(*tasks)
return download_items
async def download(self, download_item: DownloadItem) -> None:
try:
await self._download(download_item)
await self._final_processing(download_item)
finally:
self.base_downloader.cleanup_temp(download_item.random_uuid)
async def _download(
self,
download_item: DownloadItem,
) -> None:
if (
Path(download_item.final_path).exists()
and not self.base_downloader.overwrite
):
raise MediaFileAlreadyExistsError(download_item.final_path)
if not self.base_downloader.is_media_streamable(
download_item.media_metadata,
):
raise MediaNotStreamableError(
download_item.media_metadata["id"],
)
if (
not download_item.stream_info
or not download_item.stream_info.audio_track.widevine_pssh
):
raise MediaFormatNotAvailableError(
download_item.media_metadata["id"],
)
if download_item.media_metadata["type"] in {"songs", "library-songs"}:
await self.song_downloader.download(download_item)
async def _final_processing(
self,
download_item: DownloadItem,
) -> None:
if self.base_downloader.skip_processing:
return
if Path(download_item.staged_path).exists():
self.base_downloader.move_to_final_path(
download_item.staged_path,
download_item.final_path,
)
if download_item.cover_path and self.base_downloader.save_cover:
cover_url = self.base_downloader.get_cover_url(
download_item.cover_url_template,
)
cover_bytes = await self.base_downloader.get_cover_bytes(cover_url)
if cover_bytes:
self.base_downloader.write_cover_image(
cover_bytes,
download_item.cover_path,
)
if (
download_item.lyrics
and download_item.lyrics.synced
and not self.song_downloader.skip_synced_lyrics
):
self.song_downloader.write_synced_lyrics(
download_item.lyrics.synced,
download_item.synced_lyrics_path,
)
+381
View File
@@ -0,0 +1,381 @@
import asyncio
import re
import shutil
import uuid
from io import BytesIO
from pathlib import Path
import httpx
from async_lru import alru_cache
from mutagen.mp4 import MP4, MP4Cover
from PIL import Image
from pywidevine import Cdm, Device
from yt_dlp import YoutubeDL
from ..api.apple_music_api import AppleMusicApi
from ..interface.interface import AppleMusicInterface
from ..interface.types import MediaTags, PlaylistTags
from ..utils import async_subprocess, raise_for_status
from .constants import (
ILLEGAL_CHAR_REPLACEMENT,
ILLEGAL_CHARS_RE,
IMAGE_FILE_EXTENSION_MAP,
TEMP_PATH_TEMPLATE,
)
from .enums import CoverFormat, DownloadMode, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
class AppleMusicBaseDownloader:
def __init__(
self,
api: AppleMusicApi,
output_path: str = "./Apple Music",
temp_path: str = ".",
wvd_path: str = None,
overwrite: bool = False,
save_cover: bool = False,
save_playlist: bool = False,
no_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
nm3u8dlre_path: str = "N_m3u8DL-RE",
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
download_mode: DownloadMode = DownloadMode.YTDLP,
remux_mode: RemuxMode = RemuxMode.FFMPEG,
cover_format: CoverFormat = CoverFormat.JPG,
template_folder_album: str = "{album_artist}/{album}",
template_folder_compilation: str = "Compilations/{album}",
template_file_single_disc: str = "{track:02d} {title}",
template_file_multi_disc: str = "{disc}-{track:02d} {title}",
template_folder_no_album: str = "{artist}/Unknown Album",
template_file_no_album: str = "{title}",
template_file_playlist: str = "Playlists/{playlist_artist}/{playlist_title}",
template_date: str = "%Y-%m-%dT%H:%M:%SZ",
exclude_tags: list[str] = None,
cover_size: int = 1200,
truncate: int = None,
database_path: str = None,
silent: bool = False,
skip_processing: bool = False,
):
self.api = api
self.output_path = output_path
self.temp_path = temp_path
self.wvd_path = wvd_path
self.overwrite = overwrite
self.save_cover = save_cover
self.save_playlist = save_playlist
self.no_synced_lyrics = no_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
self.nm3u8dlre_path = nm3u8dlre_path
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.download_mode = download_mode
self.remux_mode = remux_mode
self.cover_format = cover_format
self.template_folder_album = template_folder_album
self.template_folder_compilation = template_folder_compilation
self.template_file_single_disc = template_file_single_disc
self.template_file_multi_disc = template_file_multi_disc
self.template_folder_no_album = template_folder_no_album
self.template_file_no_album = template_file_no_album
self.template_file_playlist = template_file_playlist
self.template_date = template_date
self.exclude_tags = exclude_tags
self.cover_size = cover_size
self.truncate = truncate
self.database_path = database_path
self.silent = silent
self.skip_processing = skip_processing
def setup(self):
self._setup_binary_paths()
self._setup_cdm()
self._setup_interface()
def _setup_binary_paths(self):
self.full_n3u8dlre_path = shutil.which(self.nm3u8dlre_path)
self.full_mp4decrypt_path = shutil.which(self.mp4decrypt_path)
self.full_ffmpeg_path = shutil.which(self.ffmpeg_path)
self.full_mp4box_path = shutil.which(self.mp4box_path)
def _setup_cdm(self):
if self.wvd_path:
self.cdm = Cdm.from_device(Device.load(self.wvd_path))
else:
self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD))
def _setup_interface(self):
self.interface = AppleMusicInterface(self.api)
def get_random_uuid(self) -> str:
return uuid.uuid4().hex[:8]
def is_media_streamable(
self,
media_metadata: dict,
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
async def get_cover_file_extension(self, cover_url_template: str) -> str | None:
if self.cover_format != CoverFormat.RAW:
return f".{self.cover_format.value}"
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
f".{image_format.lower()}",
)
def get_playlist_tags(
self,
playlist_metadata: dict,
media_metadata: dict,
) -> PlaylistTags:
playlist_track = (
playlist_metadata["relationships"]["tracks"]["data"].index(media_metadata)
+ 1
)
return PlaylistTags(
playlist_artist=playlist_metadata["attributes"].get(
"curatorName", "Unknown"
),
playlist_id=playlist_metadata["attributes"]["playParams"]["id"],
playlist_title=playlist_metadata["attributes"]["name"],
playlist_track=playlist_track,
)
def get_temp_path(
self,
media_id: str,
folder_tag: str,
file_tag: str,
file_extension: str,
) -> str:
return str(
Path(self.temp_path)
/ TEMP_PATH_TEMPLATE.format(folder_tag)
/ (f"{media_id}_{file_tag}" + file_extension)
)
@alru_cache()
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
async with httpx.AsyncClient() as client:
response = await client.get(cover_url)
raise_for_status(response, {200, 404})
if response.status_code == 200:
return response.content
return None
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(
ILLEGAL_CHARS_RE,
ILLEGAL_CHAR_REPLACEMENT,
dirty_string,
)
if is_folder:
dirty_string = dirty_string[: self.truncate]
if dirty_string.endswith("."):
dirty_string = dirty_string[:-1] + ILLEGAL_CHAR_REPLACEMENT
else:
if self.truncate is not None:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
def get_final_path(
self,
tags: MediaTags,
file_extension: str,
playlist_tags: PlaylistTags,
) -> str:
if tags.album is not None:
template_folder = (
self.template_folder_compilation.split("/")
if tags.compilation
else self.template_folder_album.split("/")
)
template_file = (
self.template_file_multi_disc.split("/")
if tags.disc_total > 1
else self.template_file_single_disc.split("/")
)
else:
template_folder = self.template_folder_no_album.split("/")
template_file = self.template_file_no_album.split("/")
template_final = template_folder + template_file
tags_dict = tags.__dict__.copy()
if playlist_tags:
tags_dict.update(playlist_tags.__dict__)
return str(
Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_final[0:-1]
],
(
self.get_sanitized_string(
template_final[-1].format(**tags_dict), False
)
+ file_extension
),
)
)
def get_cover_url_template(self, metadata: dict) -> str:
if self.cover_format == CoverFormat.RAW:
return self._get_raw_cover_url(metadata["attributes"]["artwork"]["url"])
return metadata["attributes"]["artwork"]["url"]
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
cover_url_template,
),
)
def get_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{self.cover_size}x{self.cover_size}bb.{self.cover_format.value}"
if self.cover_format != CoverFormat.RAW
else ""
),
cover_url_template,
)
async def download_stream(self, stream_url: str, download_path: str):
if self.download_mode == DownloadMode.YTDLP:
await self.download_ytdlp(stream_url, download_path)
if self.download_mode == DownloadMode.NM3U8DLRE:
await self.download_nm3u8dlre(stream_url, download_path)
async def download_ytdlp(self, stream_url: str, download_path: str) -> None:
await asyncio.to_thread(
self._download_ytdlp,
stream_url,
download_path,
)
def _download_ytdlp(self, stream_url: str, download_path: str) -> None:
with YoutubeDL(
{
"quiet": True,
"no_warnings": True,
"outtmpl": download_path,
"allow_unplayable_formats": True,
"overwrites": True,
"fixup": "never",
"noprogress": self.silent,
"allowed_extractors": ["generic"],
}
) as ydl:
ydl.download(stream_url)
async def download_nm3u8dlre(self, stream_url: str, download_path: str):
download_path_obj = Path(download_path)
download_path_obj.parent.mkdir(parents=True, exist_ok=True)
await async_subprocess(
self.full_n3u8dlre_path,
stream_url,
"--binary-merge",
"--no-log",
"--log-level",
"off",
"--ffmpeg-binary-path",
self.full_ffmpeg_path,
"--save-name",
download_path_obj.stem,
"--save-dir",
download_path_obj.parent,
"--tmp-dir",
download_path_obj.parent,
silent=self.silent,
)
async def apply_tags(
self,
media_path: Path,
tags: MediaTags,
cover_url_template: str,
):
exclude_tags = self.exclude_tags or []
filtered_tags = MediaTags(
**{
k: v
for k, v in tags.__dict__.items()
if v is not None and k not in exclude_tags
}
)
mp4_tags = filtered_tags.as_mp4_tags(self.template_date)
skip_tagging = "all" in exclude_tags
mp4 = MP4(media_path)
mp4.clear()
if not skip_tagging:
if "cover" not in exclude_tags and self.cover_format != CoverFormat.RAW:
await self._apply_cover(mp4, cover_url_template)
mp4.update(mp4_tags)
mp4.save()
async def _apply_cover(
self,
mp4: MP4,
cover_url_template: str,
) -> None:
cover_url = self.get_cover_url(cover_url_template)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return
mp4["covr"] = [
MP4Cover(
data=cover_bytes,
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
def move_to_final_path(self, stage_path: str, final_path: str) -> None:
Path(final_path).parent.mkdir(parents=True, exist_ok=True)
shutil.move(stage_path, final_path)
def write_cover_image(
self,
cover_bytes: bytes,
cover_path: str,
) -> None:
Path(cover_path).parent.mkdir(parents=True, exist_ok=True)
Path(cover_path).write_bytes(cover_bytes)
def cleanup_temp(self, random_uuid: str) -> None:
temp_folder = Path(self.temp_path) / TEMP_PATH_TEMPLATE.format(random_uuid)
if temp_folder.exists():
shutil.rmtree(temp_folder)
+298
View File
@@ -0,0 +1,298 @@
from pathlib import Path
from ..interface.enums import SongCodec, SyncedLyricsFormat
from ..interface.interface_song import AppleMusicSongInterface
from ..interface.types import DecryptionKeyAv
from ..utils import async_subprocess
from .constants import DEFAULT_SONG_DECRYPTION_KEY
from .downloader_base import AppleMusicBaseDownloader
from .enums import RemuxMode
from .types import DownloadItem
class AppleMusicSongDownloader:
def __init__(
self,
downloader: AppleMusicBaseDownloader,
song_codec: SongCodec = SongCodec.AAC_LEGACY,
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
skip_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
):
self.downloader = downloader
self.song_codec = song_codec
self.synced_lyrics_format = synced_lyrics_format
self.skip_synced_lyrics = skip_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
def setup(self):
self._setup_interface()
def _setup_interface(self):
self.song_interface = AppleMusicSongInterface(self.downloader.interface)
async def get_download_item(
self,
song_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = DownloadItem()
download_item.media_metadata = song_metadata
song_id = self.downloader.interface.get_media_id_of_library_media(song_metadata)
download_item.lyrics = await self.song_interface.get_lyrics(
song_metadata,
self.synced_lyrics_format,
)
if self.synced_lyrics_only:
return download_item
webplayback = await self.downloader.api.get_webplayback(song_id)
download_item.media_tags = self.song_interface.get_tags(
webplayback,
download_item.lyrics.unsynced if download_item.lyrics else None,
)
if playlist_metadata:
download_item.playlist_tags = self.downloader.get_playlist_tags(
playlist_metadata,
song_metadata,
)
if self.song_codec.is_legacy():
download_item.stream_info = (
await self.song_interface.get_stream_info_legacy(
webplayback,
self.song_codec,
)
)
download_item.decryption_key = (
await self.song_interface.get_decryption_key_legacy(
download_item.stream_info,
self.downloader.cdm,
)
)
else:
download_item.stream_info = await self.song_interface.get_stream_info(
song_metadata,
self.song_codec,
)
if (
download_item.stream_info
and download_item.stream_info.audio_track.widevine_pssh
):
download_item.decryption_key = (
await self.song_interface.get_decryption_key(
download_item.stream_info,
self.downloader.cdm,
)
)
else:
download_item.decryption_key = None
download_item.cover_url_template = self.downloader.get_cover_url_template(
song_metadata
)
download_item.random_uuid = self.downloader.get_random_uuid()
download_item.staged_path = self.downloader.get_temp_path(
song_id,
download_item.random_uuid,
"staged",
"." + download_item.stream_info.file_format.value,
)
download_item.final_path = self.downloader.get_final_path(
download_item.media_tags,
".m4a",
playlist_metadata,
)
download_item.synced_lyrics_path = self.get_lyrics_synced_path(
download_item.final_path,
)
cover_file_extension = await self.downloader.get_cover_file_extension(
download_item.cover_url_template,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
download_item.final_path,
cover_file_extension,
)
return download_item
def fix_key_id(self, input_path: str):
count = 0
with open(input_path, "rb+") as file:
while data := file.read(4096):
pos = file.tell()
i = 0
while tenc := max(0, data.find(b"tenc", i)):
kid = tenc + 12
file.seek(max(0, pos - 4096) + kid, 0)
file.write(bytes.fromhex(f"{count:032}"))
count += 1
i = kid + 1
file.seek(pos, 0)
async def remux_mp4box(self, input_path: str, output_path: str):
await async_subprocess(
self.downloader.full_mp4box_path,
"-quiet",
"-add",
input_path,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
output_path,
silent=self.downloader.silent,
)
async def remux_ffmpeg(
self,
input_path: str,
output_path: str,
decryption_key: str = None,
):
if decryption_key:
key = [
"-decryption_key",
decryption_key,
]
else:
key = []
await async_subprocess(
self.downloader.full_ffmpeg_path,
"-loglevel",
"error",
"-y",
*key,
"-i",
input_path,
"-c",
"copy",
"-movflags",
"+faststart",
output_path,
silent=self.downloader.silent,
)
async def decrypt_mp4decrypt(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool,
):
if legacy:
keys = [
"--key",
f"1:{decryption_key}",
]
else:
self.fix_key_id(input_path)
keys = [
"--key",
"0" * 31 + "1" + f":{decryption_key}",
"--key",
"0" * 32 + f":{DEFAULT_SONG_DECRYPTION_KEY}",
]
await async_subprocess(
self.downloader.full_mp4decrypt_path,
*keys,
input_path,
output_path,
silent=self.downloader.silent,
)
async def stage(
self,
encrypted_path: str,
decrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
codec: SongCodec,
):
if codec.is_legacy() and self.downloader.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
)
else:
await self.decrypt_mp4decrypt(
encrypted_path,
decrypted_path,
decryption_key.audio_track.key,
codec.is_legacy(),
)
if self.downloader.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
decrypted_path,
staged_path,
)
else:
await self.remux_mp4box(
decrypted_path,
staged_path,
)
def get_lyrics_synced_path(self, final_path: str) -> str:
return str(Path(final_path).with_suffix("." + self.synced_lyrics_format.value))
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
return str(Path(final_path).parent / ("Cover" + file_extension))
def write_synced_lyrics(
self,
synced_lyrics: str,
lyrics_synced_path: str,
):
Path(lyrics_synced_path).parent.mkdir(parents=True, exist_ok=True)
Path(lyrics_synced_path).write_text(synced_lyrics, encoding="utf8")
async def download(
self,
download_item: DownloadItem,
) -> None:
if self.synced_lyrics_only:
return
encrypted_path = self.downloader.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"encrypted",
".m4a",
)
await self.downloader.download_stream(
download_item.stream_info.audio_track.stream_url,
encrypted_path,
)
decrypted_path = self.downloader.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"decrypted",
".m4a",
)
await self.stage(
encrypted_path,
decrypted_path,
download_item.staged_path,
download_item.decryption_key,
self.song_codec,
)
await self.downloader.apply_tags(
download_item.staged_path,
download_item.media_tags,
download_item.cover_url_template,
)
+17
View File
@@ -0,0 +1,17 @@
from enum import Enum
class DownloadMode(Enum):
YTDLP = "ytdlp"
NM3U8DLRE = "nm3u8dlre"
class RemuxMode(Enum):
FFMPEG = "ffmpeg"
MP4BOX = "mp4box"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
+18
View File
@@ -0,0 +1,18 @@
class MediaNotStreamableError(Exception):
def __init__(self, media_id: str):
super().__init__(
f'Media with ID "{media_id}" is not streamable'.format(media_id=media_id)
)
class MediaFileAlreadyExistsError(Exception):
def __init__(self, media_path: str):
super().__init__(f'Media file already exists at path "{media_path}"')
class MediaFormatNotAvailableError(Exception):
def __init__(self, media_id: str):
super().__init__(
f'Media with ID "{media_id}" is not available in the requested format'
)
+25
View File
@@ -0,0 +1,25 @@
from dataclasses import dataclass
from ..interface.types import (
DecryptionKeyAv,
Lyrics,
MediaTags,
PlaylistTags,
StreamInfoAv,
)
@dataclass
class DownloadItem:
media_metadata: dict = None
random_uuid: str = None
lyrics: Lyrics = None
media_tags: MediaTags = None
playlist_tags: PlaylistTags = None
stream_info: StreamInfoAv = None
decryption_key: DecryptionKeyAv = None
cover_url_template: str = None
staged_path: str = None
final_path: str = None
synced_lyrics_path: str = None
cover_path: str = None