Compare commits

...

13 Commits

Author SHA1 Message Date
Rafael Moraes 6b67c435fa Fix spacing in CLI warning message 2026-02-25 15:12:46 -03:00
Rafael Moraes 240ba7d4de Handle 404 ApiError for Apple Music calls 2026-02-25 15:09:52 -03:00
Rafael Moraes 02c19963b4 Clarify wrapper requirements in README 2026-02-25 14:55:33 -03:00
Rafael Moraes 2e2fef1426 Bump version to 2.9 2026-02-25 14:54:28 -03:00
Rafael Moraes ae3b2e1c6d Skip fetching covers when CoverFormat.RAW 2026-02-25 14:48:47 -03:00
Rafael Moraes 6516855be9 Fix Apple Music cover URL and async image read 2026-02-25 14:48:35 -03:00
Rafael Moraes 77cbb8a7ca Clarify README prerequisites and config table 2026-02-25 14:33:50 -03:00
Rafael Moraes 18bc6595a9 Add music_video_remux_mode and adjust checks 2026-02-25 14:33:32 -03:00
Rafael Moraes da2c3d5f1e Move remux_mode to music video downloader 2026-02-25 14:33:08 -03:00
Rafael Moraes abe364aad1 Remove unused imports in downloader_song.py 2026-02-25 14:32:29 -03:00
Rafael Moraes 10b529d6fd Remove hardcoded song decryption key 2026-02-25 14:08:57 -03:00
Rafael Moraes afe42848d0 Refactor song decryption and staging 2026-02-25 14:08:35 -03:00
Rafael Moraes b3b5e6d1b2 Add sample encryption parsing and hex-key decryption 2026-02-25 14:08:09 -03:00
14 changed files with 729 additions and 301 deletions
+59 -60
View File
@@ -26,16 +26,14 @@ A command-line app for downloading Apple Music songs, music videos and post vide
- **Apple Music Cookies** - Export your browser cookies in Netscape format while logged in with an active subscription at the Apple Music website:
- **Firefox**: [Export Cookies](https://addons.mozilla.org/addon/export-cookies-txt)
- **Chromium**: [Get cookies.txt LOCALLY](https://chromewebstore.google.com/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc)
- **FFmpeg** - Must be in your system PATH
- **Windows**: [AnimMouse's FFmpeg Builds](https://github.com/AnimMouse/ffmpeg-stable-autobuild/releases)
- **Linux**: [John Van Sickle's FFmpeg Builds](https://johnvansickle.com/ffmpeg/)
### Optional
Add these tools to your system PATH for additional features:
- **[mp4decrypt](https://www.bento4.com/downloads/)** - Required for `mp4box` remux mode, music videos, and experimental codecs
- **[MP4Box](https://gpac.io/downloads/gpac-nightly-builds/)** - Required for `mp4box` remux mode
- **[FFmpeg](https://ffmpeg.org/download.html)** - Required for `ffmpeg` music video remux mode
- **[mp4decrypt](https://www.bento4.com/downloads/)** - Required for `mp4box` music video remux mode
- **[MP4Box](https://gpac.io/downloads/gpac-nightly-builds/)** - Required for `mp4box` music video remux mode
- **[N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE/releases/latest)** - Required for `nm3u8dlre` download mode, which is faster than the default downloader
- **[Wrapper](#-wrapper)** - For downloading songs in ALAC and other experimental codecs without API limitations
@@ -109,61 +107,62 @@ The file is created automatically on first run. Command-line arguments override
### Configuration Options
| Option | Description | Default |
| ------------------------------- | ------------------------------- | ---------------------------------------------- |
| **General Options** | | |
| `--read-urls-as-txt`, `-r` | Read URLs from text files | `false` |
| `--config-path` | Config file path | `<home>/.gamdl/config.ini` |
| `--log-level` | Logging level | `INFO` |
| `--log-file` | Log file path | - |
| `--no-exceptions` | Don't print exceptions | `false` |
| `--no-config-file`, `-n` | Don't use a config file | `false` |
| **Apple Music Options** | | |
| `--cookies-path`, `-c` | Cookies file path | `./cookies.txt` |
| `--wrapper-account-url` | Wrapper account URL | `http://127.0.0.1:30020` |
| `--language`, `-l` | Metadata language | `en-US` |
| **Output Options** | | |
| `--output-path`, `-o` | Output directory path | `./Apple Music` |
| `--temp-path` | Temporary directory path | `.` |
| `--wvd-path` | .wvd file path | - |
| `--overwrite` | Overwrite existing files | `false` |
| `--save-cover`, `-s` | Save cover as separate file | `false` |
| `--save-playlist` | Save M3U8 playlist file | `false` |
| **Download Options** | | |
| `--nm3u8dlre-path` | N_m3u8DL-RE executable path | `N_m3u8DL-RE` |
| `--mp4decrypt-path` | mp4decrypt executable path | `mp4decrypt` |
| `--ffmpeg-path` | FFmpeg executable path | `ffmpeg` |
| `--mp4box-path` | MP4Box executable path | `MP4Box` |
| `--use-wrapper` | Use wrapper | `false` |
| `--wrapper-decrypt-ip` | Wrapper decryption server IP | `127.0.0.1:10020` |
| `--download-mode` | Download mode | `ytdlp` |
| `--remux-mode` | Remux mode | `ffmpeg` |
| `--cover-format` | Cover format | `jpg` |
| **Template Options** | | |
| `--album-folder-template` | Album folder template | `{album_artist}/{album}` |
| `--compilation-folder-template` | Compilation folder template | `Compilations/{album}` |
| `--no-album-folder-template` | No album folder template | `{artist}/Unknown Album` |
| `--single-disc-file-template` | Single disc file template | `{track:02d} {title}` |
| `--multi-disc-file-template` | Multi disc file template | `{disc}-{track:02d} {title}` |
| `--no-album-file-template` | No album file template | `{title}` |
| `--playlist-file-template` | Playlist file template | `Playlists/{playlist_artist}/{playlist_title}` |
| `--date-tag-template` | Date tag template | `%Y-%m-%dT%H:%M:%SZ` |
| `--exclude-tags` | Comma-separated tags to exclude | - |
| `--cover-size` | Cover size in pixels | `1200` |
| `--truncate` | Max filename length | - |
| **Song Options** | | |
| `--song-codec` | Song codec | `aac-legacy` |
| `--synced-lyrics-format` | Synced lyrics format | `lrc` |
| `--no-synced-lyrics` | Don't download synced lyrics | `false` |
| `--synced-lyrics-only` | Download only synced lyrics | `false` |
| `--use-album-date` | Use album release date for songs | `false` |
| Option | Description | Default |
| ------------------------------- | ----------------------------------------------------------------- | ---------------------------------------------- |
| **General Options** | | |
| `--read-urls-as-txt`, `-r` | Read URLs from text files | `false` |
| `--config-path` | Config file path | `<home>/.gamdl/config.ini` |
| `--log-level` | Logging level | `INFO` |
| `--log-file` | Log file path | - |
| `--no-exceptions` | Don't print exceptions | `false` |
| `--no-config-file`, `-n` | Don't use a config file | `false` |
| **Apple Music Options** | | |
| `--cookies-path`, `-c` | Cookies file path | `./cookies.txt` |
| `--wrapper-account-url` | Wrapper account URL | `http://127.0.0.1:30020` |
| `--language`, `-l` | Metadata language | `en-US` |
| **Output Options** | | |
| `--output-path`, `-o` | Output directory path | `./Apple Music` |
| `--temp-path` | Temporary directory path | `.` |
| `--wvd-path` | .wvd file path | - |
| `--overwrite` | Overwrite existing files | `false` |
| `--save-cover`, `-s` | Save cover as separate file | `false` |
| `--save-playlist` | Save M3U8 playlist file | `false` |
| **Download Options** | | |
| `--nm3u8dlre-path` | N_m3u8DL-RE executable path | `N_m3u8DL-RE` |
| `--mp4decrypt-path` | mp4decrypt executable path | `mp4decrypt` |
| `--ffmpeg-path` | FFmpeg executable path | `ffmpeg` |
| `--mp4box-path` | MP4Box executable path | `MP4Box` |
| `--use-wrapper` | Use wrapper | `false` |
| `--wrapper-decrypt-ip` | Wrapper decryption server IP | `127.0.0.1:10020` |
| `--download-mode` | Download mode | `ytdlp` |
| `--cover-format` | Cover format | `jpg` |
| **Template Options** | | |
| `--album-folder-template` | Album folder template | `{album_artist}/{album}` |
| `--compilation-folder-template` | Compilation folder template | `Compilations/{album}` |
| `--no-album-folder-template` | No album folder template | `{artist}/Unknown Album` |
| `--single-disc-file-template` | Single disc file template | `{track:02d} {title}` |
| `--multi-disc-file-template` | Multi disc file template | `{disc}-{track:02d} {title}` |
| `--no-album-file-template` | No album file template | `{title}` |
| `--playlist-file-template` | Playlist file template | `Playlists/{playlist_artist}/{playlist_title}` |
| `--date-tag-template` | Date tag template | `%Y-%m-%dT%H:%M:%SZ` |
| `--exclude-tags` | Comma-separated tags to exclude | - |
| `--cover-size` | Cover size in pixels | `1200` |
| `--truncate` | Max filename length | - |
| **Song Options** | | |
| `--song-codec` | Song codec | `aac-legacy` |
| `--synced-lyrics-format` | Synced lyrics format | `lrc` |
| `--no-synced-lyrics` | Don't download synced lyrics | `false` |
| `--synced-lyrics-only` | Download only synced lyrics | `false` |
| `--use-album-date` | Use album release date for songs | `false` |
| `--fetch-extra-tags` | Fetch extra tags from preview (normalization and smooth playback) | `false` |
| **Music Video Options** | | |
| `--music-video-codec-priority` | Comma-separated codec priority | `h264,h265` |
| `--music-video-remux-format` | Music video remux format | `m4v` |
| `--music-video-resolution` | Max music video resolution | `1080p` |
| **Post Video Options** | | |
| `--uploaded-video-quality` | Post video quality | `best` |
| **Music Video Options** | | |
| `--music-video-codec-priority` | Comma-separated codec priority | `h264,h265` |
| `--music-video-remux-mode` | Remux mode | `ffmpeg` |
| `--music-video-remux-format` | Music video remux format | `m4v` |
| `--music-video-resolution` | Max music video resolution | `1080p` |
| **Post Video Options** | | |
| `--uploaded-video-quality` | Post video quality | `best` |
### Template Variables
@@ -256,7 +255,7 @@ Use ISO 639-1 language codes (e.g., `en-US`, `es-ES`, `ja-JP`, `pt-BR`). Don't a
## ⚙️ Wrapper
Use the [wrapper](https://github.com/WorldObservationLog/wrapper) to download songs in ALAC and other experimental codecs without API limitations. Cookies, FFmpeg, MP4Box, or mp4decrypt are not required when using the wrapper.
Use the [wrapper](https://github.com/WorldObservationLog/wrapper) to download songs in ALAC and other experimental codecs without API limitations. Cookies are not required when using the wrapper.
### Setup Instructions
+1 -1
View File
@@ -1 +1 @@
__version__ = "2.8.7"
__version__ = "2.9"
+35 -27
View File
@@ -126,7 +126,6 @@ async def main(config: CliConfig):
use_wrapper=config.use_wrapper,
wrapper_decrypt_ip=config.wrapper_decrypt_ip,
download_mode=config.download_mode,
remux_mode=config.remux_mode,
cover_format=config.cover_format,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
@@ -154,6 +153,7 @@ async def main(config: CliConfig):
base_downloader=base_downloader,
interface=music_video_interface,
codec_priority=config.music_video_codec_priority,
remux_mode=config.music_video_remux_mode,
remux_format=config.music_video_remux_format,
resolution=config.music_video_resolution,
)
@@ -171,30 +171,6 @@ async def main(config: CliConfig):
)
if not config.synced_lyrics_only:
if not config.use_wrapper:
if not base_downloader.full_ffmpeg_path and (
config.remux_mode == RemuxMode.FFMPEG
or config.download_mode == DownloadMode.NM3U8DLRE
):
logger.critical(X_NOT_IN_PATH.format("ffmpeg", config.ffmpeg_path))
return
if (
not base_downloader.full_mp4box_path
and config.remux_mode == RemuxMode.MP4BOX
):
logger.critical(X_NOT_IN_PATH.format("MP4Box", config.mp4box_path))
return
if not base_downloader.full_mp4decrypt_path and (
config.song_codec not in (SongCodec.AAC_LEGACY, SongCodec.AAC_HE_LEGACY)
or config.remux_mode == RemuxMode.MP4BOX
):
logger.critical(
X_NOT_IN_PATH.format("mp4decrypt", config.mp4decrypt_path)
)
return
if (
config.download_mode == DownloadMode.NM3U8DLRE
and not base_downloader.full_nm3u8dlre_path
@@ -202,10 +178,42 @@ async def main(config: CliConfig):
logger.critical(X_NOT_IN_PATH.format("N_m3u8DL-RE", config.nm3u8dlre_path))
return
missing_music_video_paths = []
if not base_downloader.full_ffmpeg_path and (
config.music_video_remux_mode == RemuxMode.FFMPEG
or config.download_mode == DownloadMode.NM3U8DLRE
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("ffmpeg", config.ffmpeg_path)
)
if (
not base_downloader.full_mp4box_path
and config.music_video_remux_mode == RemuxMode.MP4BOX
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("MP4Box", config.mp4box_path)
)
if not base_downloader.full_mp4decrypt_path and (
config.song_codec not in (SongCodec.AAC_LEGACY, SongCodec.AAC_HE_LEGACY)
or config.music_video_remux_mode == RemuxMode.MP4BOX
):
missing_music_video_paths.append(
X_NOT_IN_PATH.format("mp4decrypt", config.mp4decrypt_path)
)
if missing_music_video_paths:
logger.warning(
"Music videos will not be downloaded due to missing dependencies:\n"
+ "\n".join(missing_music_video_paths)
)
if not config.song_codec.is_legacy() and not config.use_wrapper:
logger.warning(
"You have chosen an experimental song codec"
" without enabling wrapper."
"You have chosen an experimental song codec "
"without enabling wrapper. "
"They're not guaranteed to work due to API limitations."
)
+9 -9
View File
@@ -261,15 +261,6 @@ class CliConfig:
type=DownloadMode,
),
]
remux_mode: Annotated[
RemuxMode,
option(
"--remux-mode",
help="Remux mode",
default=base_downloader_sig.parameters["remux_mode"].default,
type=RemuxMode,
),
]
cover_format: Annotated[
CoverFormat,
option(
@@ -431,6 +422,15 @@ class CliConfig:
type=Csv(MusicVideoCodec),
),
]
music_video_remux_mode: Annotated[
RemuxMode,
option(
"--music-video-remux-mode",
help="Remux mode",
default=music_video_downloader_sig.parameters["remux_mode"].default,
type=RemuxMode,
),
]
music_video_remux_format: Annotated[
RemuxFormatMusicVideo,
option(
+488 -3
View File
@@ -7,11 +7,17 @@ import asyncio
import io
import logging
import struct
import time
from dataclasses import dataclass, field
from typing import BinaryIO, List, Optional
from Crypto.Cipher import AES
logger = logging.getLogger(__name__)
# Default decryption key for songs without per-sample keys (legacy AAC)
DEFAULT_SONG_DECRYPTION_KEY = b"2\xb8\xad\xe1v\x9e&\xb1\xff\xb8\x98cRy?\xc6"
# Pre-fetch key used for first sample description
PREFETCH_KEY = "skd://itunes.apple.com/P000000000/s1/e1"
@@ -26,6 +32,20 @@ class SampleInfo:
data: bytes
duration: int
desc_index: int
iv: bytes = b"" # Per-sample IV from senc (empty if constant IV)
subsamples: List[tuple] = field(
default_factory=list
) # [(clear_bytes, encrypted_bytes), ...]
@dataclass
class EncryptionInfo:
"""Encryption scheme info extracted from sinf/schm + sinf/schi/tenc."""
scheme_type: str = "cbcs" # 'cenc' or 'cbcs'
per_sample_iv_size: int = 0 # 0, 8, or 16
constant_iv: bytes = b"" # Constant IV (when per_sample_iv_size == 0)
kid: bytes = b"" # Default Key ID (16 bytes)
@dataclass
@@ -35,6 +55,7 @@ class SongInfo:
samples: List[SampleInfo] = field(default_factory=list)
moov_data: bytes = b""
ftyp_data: bytes = b""
encryption_info: Optional[EncryptionInfo] = None
def read_box_header(f: BinaryIO) -> tuple[int, str, int]:
@@ -147,6 +168,10 @@ def extract_song(input_path: str) -> SongInfo:
)
logger.debug(f"Audio track ID: {audio_track_id}")
# Extract encryption scheme info from moov (sinf/schm + sinf/schi/tenc)
if song_info.moov_data:
song_info.encryption_info = _extract_encryption_info(song_info.moov_data)
# Parse moof/mdat pairs
moof_box = None
for box in boxes:
@@ -158,6 +183,11 @@ def extract_song(input_path: str) -> SongInfo:
mdat_data = box["data"][box["header_size"] :] # Skip mdat header
# Parse moof for tfhd (sample description index, defaults) and trun (entries)
_iv_size = (
song_info.encryption_info.per_sample_iv_size
if song_info.encryption_info
else 0
)
samples_from_pair = _parse_moof_mdat(
moof_data,
mdat_data,
@@ -166,6 +196,7 @@ def extract_song(input_path: str) -> SongInfo:
audio_track_id=audio_track_id,
moof_offset=moof_box["offset"],
mdat_data_offset=box["offset"] + box["header_size"],
per_sample_iv_size=_iv_size,
)
song_info.samples.extend(samples_from_pair)
moof_box = None
@@ -182,6 +213,7 @@ def _parse_moof_mdat(
audio_track_id: int = 1,
moof_offset: int = 0,
mdat_data_offset: int = 0,
per_sample_iv_size: int = 0,
) -> List[SampleInfo]:
"""Parse a moof box and extract samples from corresponding mdat.
@@ -192,6 +224,7 @@ def _parse_moof_mdat(
audio_track_id: Track ID of the audio track to extract.
moof_offset: Absolute file offset of the moof box.
mdat_data_offset: Absolute file offset of the mdat content (after header).
per_sample_iv_size: IV size per sample from tenc (0, 8, or 16).
"""
samples = []
@@ -216,6 +249,7 @@ def _parse_moof_mdat(
}
trun_entries = []
first_trun_data_offset = None
senc_entries = [] # Per-sample encryption info from senc box
traf_offset = offset + 8
traf_end = offset + size
@@ -241,6 +275,11 @@ def _parse_moof_mdat(
if first_trun_data_offset is None:
first_trun_data_offset = data_off
trun_entries.extend(entries)
elif inner_type == "senc":
senc_entries = _parse_senc(
moof_data[traf_offset + 8 : traf_offset + inner_size],
per_sample_iv_size,
)
traf_offset += inner_size
@@ -264,17 +303,26 @@ def _parse_moof_mdat(
if desc_index > 0:
desc_index -= 1 # Convert to 0-indexed
for entry in trun_entries:
for i, entry in enumerate(trun_entries):
sample_size = entry.get("size", tfhd_info["default_size"])
sample_duration = entry.get("duration", tfhd_info["default_duration"])
if sample_size > 0 and mdat_read_offset + sample_size <= len(mdat_data):
# Attach per-sample encryption info from senc if available
sample_iv = b""
sample_subsamples = []
if i < len(senc_entries):
sample_iv = senc_entries[i]["iv"]
sample_subsamples = senc_entries[i]["subsamples"]
sample = SampleInfo(
data=mdat_data[
mdat_read_offset : mdat_read_offset + sample_size
],
duration=sample_duration,
desc_index=desc_index,
iv=sample_iv,
subsamples=sample_subsamples,
)
samples.append(sample)
mdat_read_offset += sample_size
@@ -357,6 +405,56 @@ def _parse_trun(data: bytes, tfhd_info: dict) -> tuple[List[dict], Optional[int]
return entries, data_offset_value
def _parse_senc(data: bytes, per_sample_iv_size: int) -> List[dict]:
"""Parse Sample Encryption Box (senc) content (after box header).
Returns a list of dicts, one per sample:
{"iv": bytes, "subsamples": [(clear_bytes, encrypted_bytes), ...]}
The data starts after the 8-byte box header (size+type) but includes
the FullBox header (version 1 byte + flags 3 bytes).
per_sample_iv_size can be 0 (cbcs with constant IV from tenc) — in that case
there are 0 IV bytes per sample but subsample info may still be present.
"""
if len(data) < 8:
return []
version = data[0]
flags = struct.unpack(">I", b"\x00" + data[1:4])[0]
sample_count = struct.unpack(">I", data[4:8])[0]
entries: List[dict] = []
offset = 8
for _ in range(sample_count):
# Read per-sample IV (0 bytes when per_sample_iv_size == 0)
iv = b""
if per_sample_iv_size > 0:
if offset + per_sample_iv_size > len(data):
break
iv = data[offset : offset + per_sample_iv_size]
offset += per_sample_iv_size
subsamples = []
if flags & 0x02:
# Subsample encryption info present
if offset + 2 > len(data):
break
subsample_count = struct.unpack(">H", data[offset : offset + 2])[0]
offset += 2
for _ in range(subsample_count):
if offset + 6 > len(data):
break
clear_bytes = struct.unpack(">H", data[offset : offset + 2])[0]
encrypted_bytes = struct.unpack(">I", data[offset + 2 : offset + 6])[0]
subsamples.append((clear_bytes, encrypted_bytes))
offset += 6
entries.append({"iv": iv, "subsamples": subsamples})
return entries
async def decrypt_samples(
wrapper_ip: str,
track_id: str,
@@ -380,8 +478,6 @@ async def decrypt_samples(
Args:
progress_callback: Optional callback(current_sample, total_samples, bytes_processed) for progress tracking
"""
import time
host, port = wrapper_ip.split(":")
port = int(port)
@@ -1210,6 +1306,188 @@ def _write_udta(f):
_fixup_box_size(f, udta_start, b"udta")
def _extract_encryption_info(moov_data: bytes) -> Optional[EncryptionInfo]:
"""Extract encryption scheme info from the audio track's sinf box.
Walks moov → trak (audio) → mdia → minf → stbl → stsd → sample_entry → sinf,
then reads sinf/schm for scheme_type and sinf/schi/tenc for IV size, constant IV,
and default KID.
Returns EncryptionInfo or None if no sinf is found.
"""
trak_data = _find_audio_trak(moov_data)
if trak_data is None:
return None
# Navigate trak → mdia → minf → stbl → stsd
mdia = _find_child_box(trak_data, b"mdia")
if mdia is None:
return None
minf = _find_child_box(mdia, b"minf")
if minf is None:
return None
stbl = _find_child_box(minf, b"stbl")
if stbl is None:
return None
stsd = _find_child_box(stbl, b"stsd")
if stsd is None:
return None
# stsd is a FullBox: 4 (size) + 4 (type) + 4 (version+flags) + 4 (entry_count)
# Then the first sample entry immediately follows
if len(stsd) < 16:
return None
entry_offset = 16 # past header+version+flags+entry_count
if entry_offset + 8 > len(stsd):
return None
entry_size = struct.unpack(">I", stsd[entry_offset : entry_offset + 4])[0]
entry_data = stsd[entry_offset : entry_offset + entry_size]
# Find sinf inside this sample entry
# Audio sample entries have a 36-byte fixed header:
# size(4) + type(4) + reserved(6) + data_ref_index(2) + audio_data(20)
# Child boxes (including sinf) start at offset 36
sinf = _find_child_box(entry_data, b"sinf", skip_header=36)
if sinf is None:
return None
info = EncryptionInfo()
# Parse schm (Scheme Type Box) inside sinf
schm = _find_child_box(sinf, b"schm")
if schm and len(schm) >= 20:
# schm: 4(size) + 4(type) + 4(ver+flags) + 4(scheme_type) + 4(scheme_version)
info.scheme_type = schm[12:16].decode("ascii", errors="replace")
logger.debug(f"Encryption scheme: {info.scheme_type}")
# Parse tenc (Track Encryption Box) inside sinf/schi
schi = _find_child_box(sinf, b"schi")
if schi:
tenc = _find_child_box(schi, b"tenc")
if tenc and len(tenc) >= 32:
# tenc FullBox layout (offsets include 8-byte box header):
# [0:4] size
# [4:8] type "tenc"
# [8] version
# [9:12] flags (3 bytes)
# [12] reserved
# [13] reserved (v0) / crypt_byte_block|skip_byte_block (v1)
# [14] default_isProtected
# [15] default_Per_Sample_IV_Size
# [16:32] default_KID (16 bytes)
# if per_sample_iv_size==0:
# [32] default_constant_IV_size
# [33..] default_constant_IV
tenc_version = tenc[8]
per_sample_iv_size = tenc[15]
kid = tenc[16:32]
info.per_sample_iv_size = per_sample_iv_size
info.kid = kid
logger.debug(
f"tenc: per_sample_iv_size={per_sample_iv_size}, " f"kid={kid.hex()}"
)
# If per_sample_iv_size is 0, a constant IV follows the KID
if per_sample_iv_size == 0 and len(tenc) > 32:
constant_iv_size = tenc[32]
if len(tenc) >= 33 + constant_iv_size:
info.constant_iv = tenc[33 : 33 + constant_iv_size]
logger.debug(f"Constant IV: {info.constant_iv.hex()}")
return info
def _extract_encryption_info_per_stsd(moov_data: bytes) -> Optional[dict]:
"""Extract encryption scheme info for each stsd entry (sample description).
Returns a dict mapping desc_index (0-based) → EncryptionInfo, or None if no
encryption found. This handles cases where different sample descriptions have
different encryption parameters (e.g., different IVs or key schemes).
"""
trak_data = _find_audio_trak(moov_data)
if trak_data is None:
return None
# Navigate trak → mdia → minf → stbl → stsd
mdia = _find_child_box(trak_data, b"mdia")
if mdia is None:
return None
minf = _find_child_box(mdia, b"minf")
if minf is None:
return None
stbl = _find_child_box(minf, b"stbl")
if stbl is None:
return None
stsd = _find_child_box(stbl, b"stsd")
if stsd is None:
return None
if len(stsd) < 16:
return None
entry_count = struct.unpack(">I", stsd[12:16])[0]
if entry_count == 0:
return None
encryption_info_per_desc = {}
entry_offset = 16 # past header+version+flags+entry_count
for desc_idx in range(entry_count):
if entry_offset + 8 > len(stsd):
break
entry_size = struct.unpack(">I", stsd[entry_offset : entry_offset + 4])[0]
if entry_size < 8 or entry_offset + entry_size > len(stsd):
break
entry_data = stsd[entry_offset : entry_offset + entry_size]
# Find sinf inside this sample entry
# Audio sample entries have 36-byte fixed header before child boxes
sinf = _find_child_box(entry_data, b"sinf", skip_header=36)
if sinf is not None:
# Extract encryption info for this stsd entry
info = EncryptionInfo()
# Parse schm
schm = _find_child_box(sinf, b"schm")
if schm and len(schm) >= 20:
info.scheme_type = schm[12:16].decode("ascii", errors="replace")
logger.debug(
f"Encryption scheme for desc_index {desc_idx}: {info.scheme_type}"
)
# Parse tenc
schi = _find_child_box(sinf, b"schi")
if schi:
tenc = _find_child_box(schi, b"tenc")
if tenc and len(tenc) >= 32:
per_sample_iv_size = tenc[15]
kid = tenc[16:32]
info.per_sample_iv_size = per_sample_iv_size
info.kid = kid
logger.debug(
f"tenc (desc {desc_idx}): per_sample_iv_size={per_sample_iv_size}"
)
# If per_sample_iv_size is 0, extract constant IV
if per_sample_iv_size == 0 and len(tenc) > 32:
constant_iv_size = tenc[32]
if len(tenc) >= 33 + constant_iv_size:
info.constant_iv = tenc[33 : 33 + constant_iv_size]
logger.debug(
f"Constant IV (desc {desc_idx}): {info.constant_iv.hex()}"
)
encryption_info_per_desc[desc_idx] = info
entry_offset += entry_size
return encryption_info_per_desc if encryption_info_per_desc else None
def _extract_audio_track_id(moov_data: bytes) -> int:
"""Extract the track ID of the audio track from the moov box.
@@ -1302,3 +1580,210 @@ async def decrypt_file(
decrypted_data,
input_path, # Pass original path for codec info extraction
)
def decrypt_samples_hex(
samples: List[SampleInfo],
keys: dict,
encryption_info: EncryptionInfo,
encryption_info_per_desc: Optional[dict] = None,
) -> bytes:
"""Decrypt samples using hex AES keys (no wrapper needed).
Supports both CENC (AES-128-CTR) and CBCS (AES-128-CBC) schemes.
Args:
samples: List of SampleInfo with data, desc_index, iv, subsamples.
keys: Mapping of desc_index (int) → AES key (16 bytes, raw).
encryption_info: EncryptionInfo with scheme_type, constant_iv, etc.
encryption_info_per_desc: Optional dict mapping desc_index → EncryptionInfo
(used when different stsd entries have different params).
Returns:
Concatenated decrypted sample data.
"""
is_cenc = encryption_info.scheme_type == "cenc"
decrypted = bytearray()
for sample in samples:
key = keys.get(sample.desc_index)
if key is None:
# No key for this desc_index — keep data as-is (shouldn't happen)
decrypted.extend(sample.data)
continue
# Get encryption info for this sample's desc_index (if per-description info exists)
if encryption_info_per_desc and sample.desc_index in encryption_info_per_desc:
enc_info = encryption_info_per_desc[sample.desc_index]
else:
enc_info = encryption_info
if is_cenc:
# AES-128-CTR: per-sample IV from senc, zero-padded to 16 bytes
iv = sample.iv
if len(iv) < 16:
iv = iv + b"\x00" * (16 - len(iv))
cipher = AES.new(key, AES.MODE_CTR, nonce=b"", initial_value=iv)
if sample.subsamples:
plaintext = bytearray()
offset = 0
for clear_bytes, encrypted_bytes in sample.subsamples:
plaintext.extend(sample.data[offset : offset + clear_bytes])
offset += clear_bytes
plaintext.extend(
cipher.decrypt(sample.data[offset : offset + encrypted_bytes])
)
offset += encrypted_bytes
plaintext.extend(sample.data[offset:])
decrypted.extend(plaintext)
else:
decrypted.extend(cipher.decrypt(sample.data))
else:
# CBCS (AES-128-CBC): constant IV or per-sample IV
iv = sample.iv if sample.iv else enc_info.constant_iv
if len(iv) < 16:
iv = iv + b"\x00" * (16 - len(iv))
if sample.subsamples:
# For CBCS subsamples: concatenate all encrypted regions into one,
# decrypt as one CBC stream (to maintain cipher state), then split back.
# This avoids losing bytes if encrypted_bytes values aren't 16-byte aligned.
# Collect all encrypted byte ranges and encrypt content
encrypted_concat = bytearray()
subsample_sizes = (
[]
) # Track size of each encrypted region for reassembly
offset = 0
for clear_bytes, encrypted_bytes in sample.subsamples:
offset += clear_bytes
if encrypted_bytes > 0:
encrypted_concat.extend(
sample.data[offset : offset + encrypted_bytes]
)
subsample_sizes.append(encrypted_bytes)
offset += encrypted_bytes
# Decrypt concatenated regions as one CBC stream
total_enc_len = len(encrypted_concat)
decrypted_concat = bytearray()
if total_enc_len > 0:
cbc_len = total_enc_len & ~0xF
if cbc_len > 0:
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted_concat.extend(
cipher.decrypt(bytes(encrypted_concat[:cbc_len]))
)
# Any trailing unaligned bytes (shouldn't happen if file is well-formed)
if cbc_len < total_enc_len:
decrypted_concat.extend(encrypted_concat[cbc_len:])
# Reassemble with original clear/encrypted pattern
plaintext = bytearray()
dec_offset = 0
offset = 0
for clear_bytes, encrypted_bytes in sample.subsamples:
plaintext.extend(sample.data[offset : offset + clear_bytes])
offset += clear_bytes
if encrypted_bytes > 0:
plaintext.extend(
decrypted_concat[dec_offset : dec_offset + encrypted_bytes]
)
dec_offset += encrypted_bytes
offset += encrypted_bytes
plaintext.extend(sample.data[offset:])
decrypted.extend(plaintext)
else:
# Full subsample: for well-formed files, the entire sample should be
# a multiple of 16 bytes. Only truncate if misaligned (unexpected).
sample_len = len(sample.data)
if sample_len % 16 == 0:
# Data is properly 16-byte aligned, decrypt as-is
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted.extend(cipher.decrypt(sample.data))
else:
# Data is not aligned (unexpected case) - truncate carefully
truncated_len = sample_len & ~0xF
if truncated_len > 0:
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted.extend(cipher.decrypt(sample.data[:truncated_len]))
# Keep unaligned tail bytes as clear (unencrypted)
decrypted.extend(sample.data[truncated_len:])
else:
# Less than 16 bytes - cannot decrypt with CBC, keep as-is
decrypted.extend(sample.data)
logger.debug(
f"Decrypted {len(samples)} samples ({len(decrypted)} bytes) with hex keys"
)
return bytes(decrypted)
async def decrypt_file_hex(
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
"""Decrypt an encrypted MP4 file using a hex AES key (no wrapper/mp4decrypt).
This replaces the mp4decrypt + remux pipeline with pure-Python decryption:
1. Extract samples and encryption info from MP4
2. Decrypt samples using AES (CTR for cenc / CBC for cbcs)
3. Write clean decrypted M4A output
Args:
input_path: Path to encrypted MP4 file.
output_path: Path for decrypted output file.
decryption_key: Hex-encoded 128-bit AES key (32 hex chars).
legacy: If True, treat as legacy AAC (cenc, single key).
"""
logger.debug(f"Hex-key decrypt: {input_path} -> {output_path}")
# Extract samples (run in thread to not block)
song_info = await asyncio.to_thread(extract_song, input_path)
# Build key mapping: desc_index → raw AES key bytes
track_key = bytes.fromhex(decryption_key)
if legacy:
# Legacy AAC (cenc): single key for all samples (all desc_index 0)
keys = {0: track_key}
else:
# Non-legacy (cbcs): two sample descriptions
# desc_index 0 → DEFAULT_SONG_DECRYPTION_KEY (prefetch samples)
# desc_index 1 → track key (from Widevine CDM)
keys = {0: DEFAULT_SONG_DECRYPTION_KEY, 1: track_key}
# Use encryption info from the file (fall back to sensible defaults)
enc_info = song_info.encryption_info or EncryptionInfo(
scheme_type="cenc" if legacy else "cbcs"
)
# Try to extract per-description encryption info (for non-legacy files)
# This handles cases where desc_index 0 and 1 have different encryption parameters
enc_info_per_desc = None
if song_info.moov_data and not legacy:
enc_info_per_desc = await asyncio.to_thread(
_extract_encryption_info_per_stsd, song_info.moov_data
)
if enc_info_per_desc:
logger.debug(
f"Found per-description encryption info: {list(enc_info_per_desc.keys())}"
)
# Decrypt
decrypted_data = decrypt_samples_hex(
song_info.samples, keys, enc_info, enc_info_per_desc
)
# Write output (preserves original metadata boxes)
await asyncio.to_thread(
write_decrypted_m4a,
output_path,
song_info,
decrypted_data,
input_path,
)
-1
View File
@@ -1,6 +1,5 @@
import re
DEFAULT_SONG_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
TEMP_PATH_TEMPLATE = "gamdl_temp_{}"
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
+86 -61
View File
@@ -5,6 +5,7 @@ from pathlib import Path
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from ..api.exceptions import ApiError
from ..interface import AppleMusicInterface
from ..utils import safe_gather
from .constants import (
@@ -357,9 +358,14 @@ class AppleMusicDownloader:
download_items = []
if url_type in ARTIST_MEDIA_TYPE:
artist_response = await self.interface.apple_music_api.get_artist(
id,
)
try:
artist_response = await self.interface.apple_music_api.get_artist(
id,
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if artist_response is None:
return None
@@ -369,7 +375,12 @@ class AppleMusicDownloader:
)
if url_type in SONG_MEDIA_TYPE:
song_respose = await self.interface.apple_music_api.get_song(id)
try:
song_respose = await self.interface.apple_music_api.get_song(id)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if song_respose is None:
return None
@@ -379,12 +390,17 @@ class AppleMusicDownloader:
)
if url_type in ALBUM_MEDIA_TYPE:
if is_library:
album_response = await self.interface.apple_music_api.get_library_album(
id
)
else:
album_response = await self.interface.apple_music_api.get_album(id)
try:
if is_library:
album_response = (
await self.interface.apple_music_api.get_library_album(id)
)
else:
album_response = await self.interface.apple_music_api.get_album(id)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if album_response is None:
return None
@@ -394,14 +410,19 @@ class AppleMusicDownloader:
)
if url_type in PLAYLIST_MEDIA_TYPE:
if is_library:
playlist_response = (
await self.interface.apple_music_api.get_library_playlist(id)
)
else:
playlist_response = await self.interface.apple_music_api.get_playlist(
id
)
try:
if is_library:
playlist_response = (
await self.interface.apple_music_api.get_library_playlist(id)
)
else:
playlist_response = (
await self.interface.apple_music_api.get_playlist(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if playlist_response is None:
return None
@@ -411,9 +432,14 @@ class AppleMusicDownloader:
)
if url_type in MUSIC_VIDEO_MEDIA_TYPE:
music_video_response = await self.interface.apple_music_api.get_music_video(
id
)
try:
music_video_response = (
await self.interface.apple_music_api.get_music_video(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if music_video_response is None:
return None
@@ -423,7 +449,14 @@ class AppleMusicDownloader:
)
if url_type in UPLOADED_VIDEO_MEDIA_TYPE:
uploaded_video = await self.interface.apple_music_api.get_uploaded_video(id)
try:
uploaded_video = (
await self.interface.apple_music_api.get_uploaded_video(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if uploaded_video is None:
return None
@@ -476,53 +509,45 @@ class AppleMusicDownloader:
):
raise MediaFileExists(download_item.final_path)
if download_item.media_metadata["type"] in {
*SONG_MEDIA_TYPE,
*MUSIC_VIDEO_MEDIA_TYPE,
}:
if (
self.base_downloader.download_mode == DownloadMode.NM3U8DLRE
and not self.base_downloader.full_nm3u8dlre_path
):
raise ExecutableNotFound("N_m3u8DL-RE")
if download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
if (
not self.base_downloader.use_wrapper
or download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE
or self.song_downloader.codec.is_legacy()
self.music_video_downloader.remux_mode == RemuxMode.FFMPEG
and not self.base_downloader.full_ffmpeg_path
):
if (
self.base_downloader.remux_mode == RemuxMode.FFMPEG
and not self.base_downloader.full_ffmpeg_path
):
raise ExecutableNotFound("ffmpeg")
if (
self.base_downloader.remux_mode == RemuxMode.MP4BOX
and not self.base_downloader.full_mp4box_path
):
raise ExecutableNotFound("MP4Box")
if (
download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE
or self.base_downloader.remux_mode == RemuxMode.MP4BOX
) and not self.base_downloader.full_mp4decrypt_path:
raise ExecutableNotFound("mp4decrypt")
raise ExecutableNotFound("ffmpeg")
if (
self.base_downloader.download_mode == DownloadMode.NM3U8DLRE
and not self.base_downloader.full_nm3u8dlre_path
self.music_video_downloader.remux_mode == RemuxMode.MP4BOX
and not self.base_downloader.full_mp4box_path
):
raise ExecutableNotFound("N_m3u8DL-RE")
raise ExecutableNotFound("MP4Box")
if (
not download_item.stream_info
or not download_item.stream_info.audio_track
or not download_item.stream_info.audio_track.stream_url
or (
(
not download_item.decryption_key
or not download_item.decryption_key.audio_track
or not download_item.decryption_key.audio_track.key
)
and not self.base_downloader.use_wrapper
download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE
or self.music_video_downloader.remux_mode == RemuxMode.MP4BOX
) and not self.base_downloader.full_mp4decrypt_path:
raise ExecutableNotFound("mp4decrypt")
if (
not download_item.stream_info
or not download_item.stream_info.audio_track
or not download_item.stream_info.audio_track.stream_url
or (
(
not download_item.decryption_key
or not download_item.decryption_key.audio_track
or not download_item.decryption_key.audio_track.key
)
):
raise FormatNotAvailable(download_item.media_metadata["id"])
and not self.base_downloader.use_wrapper
)
):
raise FormatNotAvailable(download_item.media_metadata["id"])
if download_item.media_metadata["type"] in SONG_MEDIA_TYPE:
await self.song_downloader.download(download_item)
+1 -3
View File
@@ -12,7 +12,7 @@ from ..interface.enums import CoverFormat
from ..interface.types import MediaTags, PlaylistTags
from ..utils import CustomStringFormatter, async_subprocess
from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .enums import DownloadMode
from .hardcoded_wvd import HARDCODED_WVD
@@ -32,7 +32,6 @@ class AppleMusicBaseDownloader:
use_wrapper: bool = False,
wrapper_decrypt_ip: str = "127.0.0.1:10020",
download_mode: DownloadMode = DownloadMode.YTDLP,
remux_mode: RemuxMode = RemuxMode.FFMPEG,
cover_format: CoverFormat = CoverFormat.JPG,
album_folder_template: str = "{album_artist}/{album}",
compilation_folder_template: str = "Compilations/{album}",
@@ -60,7 +59,6 @@ class AppleMusicBaseDownloader:
self.use_wrapper = use_wrapper
self.wrapper_decrypt_ip = wrapper_decrypt_ip
self.download_mode = download_mode
self.remux_mode = remux_mode
self.cover_format = cover_format
self.album_folder_template = album_folder_template
self.compilation_folder_template = compilation_folder_template
+8 -2
View File
@@ -1,6 +1,6 @@
from pathlib import Path
from ..interface.enums import MusicVideoCodec, MusicVideoResolution
from ..interface.enums import CoverFormat, MusicVideoCodec, MusicVideoResolution
from ..interface.interface_music_video import AppleMusicMusicVideoInterface
from ..interface.types import DecryptionKeyAv
from ..utils import async_subprocess
@@ -18,12 +18,14 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
MusicVideoCodec.H264,
MusicVideoCodec.H265,
],
remux_mode: RemuxMode = RemuxMode.FFMPEG,
remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V,
resolution: MusicVideoResolution = MusicVideoResolution.R1080P,
):
self.__dict__.update(base_downloader.__dict__)
self.interface = interface
self.codec_priority = codec_priority
self.remux_mode = remux_mode
self.remux_format = remux_format
self.resolution = resolution
@@ -271,7 +273,11 @@ class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
download_item.decryption_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
+29 -125
View File
@@ -1,13 +1,10 @@
from pathlib import Path
from ..interface.enums import SongCodec, SyncedLyricsFormat
from ..interface.enums import CoverFormat, SongCodec, SyncedLyricsFormat
from ..interface.interface_song import AppleMusicSongInterface
from ..interface.types import DecryptionKeyAv
from ..utils import async_subprocess
from .amdecrypt import decrypt_file
from .constants import DEFAULT_SONG_DECRYPTION_KEY
from .amdecrypt import decrypt_file, decrypt_file_hex
from .downloader_base import AppleMusicBaseDownloader
from .enums import RemuxMode
from .types import DownloadItem
@@ -142,93 +139,6 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
return download_item
def fix_key_id(self, input_path: str):
count = 0
with open(input_path, "rb+") as file:
while data := file.read(4096):
pos = file.tell()
i = 0
while tenc := max(0, data.find(b"tenc", i)):
kid = tenc + 12
file.seek(max(0, pos - 4096) + kid, 0)
file.write(bytes.fromhex(f"{count:032}"))
count += 1
i = kid + 1
file.seek(pos, 0)
async def remux_mp4box(self, input_path: str, output_path: str):
await async_subprocess(
self.full_mp4box_path,
"-quiet",
"-add",
input_path,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
output_path,
silent=self.silent,
)
async def remux_ffmpeg(
self,
input_path: str,
output_path: str,
decryption_key: str = None,
):
if decryption_key:
key = [
"-decryption_key",
decryption_key,
]
else:
key = []
await async_subprocess(
self.full_ffmpeg_path,
"-loglevel",
"error",
"-y",
*key,
"-i",
input_path,
"-c",
"copy",
"-movflags",
"+faststart",
output_path,
silent=self.silent,
)
async def decrypt_mp4decrypt(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool,
):
if legacy:
keys = [
"--key",
f"1:{decryption_key}",
]
else:
self.fix_key_id(input_path)
keys = [
"--key",
"0" * 31 + "1" + f":{decryption_key}",
"--key",
"0" * 32 + f":{DEFAULT_SONG_DECRYPTION_KEY}",
]
await async_subprocess(
self.full_mp4decrypt_path,
*keys,
input_path,
output_path,
silent=self.silent,
)
async def decrypt_amdecrypt(
self,
input_path: str,
@@ -244,46 +154,43 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
output_path,
)
async def decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
decrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
codec: SongCodec,
media_id: str,
fairplay_key: str,
):
if codec.is_legacy() and self.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
)
elif codec.is_legacy() or not self.use_wrapper:
await self.decrypt_mp4decrypt(
encrypted_path,
decrypted_path,
decryption_key.audio_track.key,
codec.is_legacy(),
)
if self.remux_mode == RemuxMode.FFMPEG:
await self.remux_ffmpeg(
decrypted_path,
staged_path,
)
else:
await self.remux_mp4box(
decrypted_path,
staged_path,
)
else:
if self.use_wrapper:
await self.decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self.decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy=codec.is_legacy(),
)
def get_lyrics_synced_path(self, final_path: str) -> str:
return str(Path(final_path).with_suffix("." + self.synced_lyrics_format.value))
@@ -321,15 +228,8 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
encrypted_path,
)
decrypted_path = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"decrypted",
".m4a",
)
await self.stage(
encrypted_path,
decrypted_path,
download_item.staged_path,
download_item.decryption_key,
self.codec,
@@ -337,7 +237,11 @@ class AppleMusicSongDownloader(AppleMusicBaseDownloader):
download_item.stream_info.audio_track.fairplay_key,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
@@ -1,6 +1,6 @@
from pathlib import Path
from ..interface.enums import UploadedVideoQuality
from ..interface.enums import CoverFormat, UploadedVideoQuality
from ..interface.interface_uploaded_video import AppleMusicUploadedVideoInterface
from .downloader_base import AppleMusicBaseDownloader
from .types import DownloadItem
@@ -95,7 +95,11 @@ class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
download_item.staged_path,
)
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
+5 -5
View File
@@ -79,7 +79,8 @@ class AppleMusicInterface:
cover_url_template = self._get_raw_cover_url(
metadata["attributes"]["artwork"]["url"]
)
cover_url_template = metadata["attributes"]["artwork"]["url"]
else:
cover_url_template = metadata["attributes"]["artwork"]["url"]
logger.debug(f"Cover URL template: {cover_url_template}")
return cover_url_template
@@ -102,9 +103,9 @@ class AppleMusicInterface:
cover_format: CoverFormat,
) -> str:
cover_url = re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
r"/\{w\}x\{h\}([a-z]{2})\.jpg",
(
f"{cover_size}x{cover_size}bb.{cover_format.value}"
f"/{cover_size}x{cover_size}bb.{cover_format.value}"
if cover_format != CoverFormat.RAW
else ""
),
@@ -123,12 +124,11 @@ class AppleMusicInterface:
if cover_format != CoverFormat.RAW:
return f".{cover_format.value}"
cover_url = self.get_cover_url(cover_url)
cover_bytes = await self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(BytesIO(self.get_cover_bytes(cover_url)))
image_obj = Image.open(BytesIO(await self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return IMAGE_FILE_EXTENSION_MAP.get(
image_format,
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "gamdl"
version = "2.8.7"
version = "2.9"
description = "A command-line app for downloading Apple Music songs, music videos and post videos."
readme = "README.md"
license = "MIT"
Generated
+1 -1
View File
@@ -214,7 +214,7 @@ wheels = [
[[package]]
name = "gamdl"
version = "2.8.7"
version = "2.9"
source = { virtual = "." }
dependencies = [
{ name = "async-lru" },