Compare commits

...

49 Commits

Author SHA1 Message Date
glomatico b0c5335767 Bump version to 3.7.4 in __init__.py, pyproject.toml, and uv.lock 2026-06-12 20:51:52 -03:00
glomatico 69c2a8a063 Refactor GamdlApiResponseError to accept Any type for content and improve message formatting 2026-06-12 20:51:11 -03:00
Rafael Moraes fb143ad1b4 Merge pull request #315 from nirbhaykulkarni/fix/token-extraction-and-cover-timeout
Fix token extraction and cover art timeout
2026-06-12 20:46:11 -03:00
nirbhaykulkarni b66c06a9cb Fix token extraction and cover art timeout
- Search non-legacy index JS bundle for token (Apple moved it from index-legacy)
- Broaden JWT regex from eyJh to full 3-part JWT pattern (tokens now start with eyJ0)
- Add 30s timeout and follow_redirects to cover art fetch to avoid ConnectTimeout
2026-06-12 21:39:18 +05:30
glomatico a9e75384f0 Add method to switch m3u8 master URL to default and update playback handling 2026-06-05 22:07:31 -03:00
Rafael Moraes d88dbe5bb6 Bump version to 3.7.3 2026-05-28 17:28:47 -03:00
Rafael Moraes 8398d9c65f Handle missing playParams in metadata 2026-05-28 17:28:22 -03:00
Rafael Moraes c6bce4b2c1 Bump version to 3.7.2 2026-05-28 17:24:21 -03:00
Rafael Moraes f54ab12408 Guard playParams access to avoid KeyError 2026-05-28 17:23:35 -03:00
Rafael Moraes 817479d807 Use uncensored names and add sort fields 2026-05-28 17:20:44 -03:00
Rafael Moraes d072f322db Remove mp4.clear() call in AppleMusic downloader 2026-05-24 14:53:48 -03:00
Rafael Moraes a62ac76639 Rename SONG_CODEC_FLAVOR_MAP to MEDIA_CODEC_FLAVOR_MAP 2026-05-24 14:39:23 -03:00
Rafael Moraes 31b143d870 Add debug logging for m3u8 master URL 2026-05-24 14:38:32 -03:00
Rafael Moraes 387861bb2f Support file-backed samples and streaming decrypt 2026-05-24 14:21:21 -03:00
Rafael Moraes 24fb9bddb9 Make MusicVideoCodec.fourcc a property 2026-05-24 14:12:46 -03:00
Rafael Moraes 30ca108b80 Return optional fourcc for MusicVideoCodec 2026-05-24 12:59:38 -03:00
Rafael Moraes 1d00e74ec6 Use yt-dlp HlsFD/HttpFD and handle failures 2026-05-24 12:57:13 -03:00
Rafael Moraes bb511de552 Use download_stream instead of _download_ytdlp_async 2026-05-24 12:47:45 -03:00
Rafael Moraes 15c1bc64dd Make MusicVideoCodec.fourcc a property 2026-05-24 11:49:37 -03:00
Rafael Moraes 4f910c8e8a Use 'codec' key instead of 'formats' in error 2026-05-23 23:02:23 -03:00
Rafael Moraes ff3dcda54c Bump version to 3.7.1 2026-05-23 23:01:20 -03:00
Rafael Moraes 7ac3322839 Handle missing webplayback in song stream info 2026-05-23 22:59:00 -03:00
Rafael Moraes 740cad2ee0 Refactor song interface stream logic and imports 2026-05-23 22:57:28 -03:00
Rafael Moraes 5a41dfbdaa Handle missing m3u8 master URL 2026-05-23 22:54:48 -03:00
Rafael Moraes 141d9cd654 Pass codec through music video stream selection 2026-05-23 22:53:43 -03:00
Rafael Moraes 50f82b5de2 Refactor music video stream fetching 2026-05-23 22:41:28 -03:00
Rafael Moraes eb9caff85c Await get_tags_from_asset_info call 2026-05-23 22:35:05 -03:00
Rafael Moraes 73e0b4b48d Mark uploaded Apple Music video as DRM-free 2026-05-23 16:05:37 -03:00
Rafael Moraes 8f82697c14 Bump package version to 3.7 2026-05-23 16:04:25 -03:00
Rafael Moraes 4650391be3 Add FFmpeg requirement and --ffmpeg-path option to README 2026-05-23 16:02:26 -03:00
Rafael Moraes 0519adf693 Clarify supported URL types in README 2026-05-23 15:59:41 -03:00
Rafael Moraes 4fc91bac9f Add get_m3u8_master_url helper and use it 2026-05-23 15:58:20 -03:00
Rafael Moraes cb367049f1 Remove get_tags method from AppleMusicSongInterface 2026-05-23 15:56:42 -03:00
Rafael Moraes 34357ad31e Handle library music videos and fix logging id 2026-05-23 15:54:13 -03:00
Rafael Moraes a7140cb860 Use .get for playParams isLibrary checks 2026-05-23 15:50:24 -03:00
Rafael Moraes aa14693924 Add drm_free and is_library flags to types 2026-05-23 15:47:49 -03:00
Rafael Moraes 76a7c792cd Use API response 'id' for media.media_id 2026-05-23 15:47:39 -03:00
Rafael Moraes c75249bc2d Support Apple Music library songs streaming 2026-05-23 15:47:29 -03:00
Rafael Moraes 001a502a5c Support Apple Music library items 2026-05-23 15:47:12 -03:00
Rafael Moraes 1eba432153 Handle DRM-free tracks in AppleMusic downloader 2026-05-23 15:45:09 -03:00
Rafael Moraes 622661a679 Support songs/music-videos in library URL regex 2026-05-23 15:44:58 -03:00
Rafael Moraes 8200ee0dd1 Refactor AppleMusicBaseInterface metadata parsing 2026-05-23 15:44:48 -03:00
Rafael Moraes a8bf884d8f Handle m3u8 and HttpFD downloads in ytdlp 2026-05-23 15:44:23 -03:00
Rafael Moraes 6d8ecf65b6 Support library tracks in get_webplayback 2026-05-23 15:44:12 -03:00
Rafael Moraes 03fb4a255e Add library song/video APIs and params 2026-05-23 14:42:55 -03:00
Rafael Moraes f8ec2367af Add include param to library endpoints 2026-05-23 14:18:16 -03:00
Rafael Moraes b5432d1344 Add library endpoints and client methods 2026-05-23 13:37:00 -03:00
Rafael Moraes bd59bb7c98 Add ffmpeg_path CLI option and pass to downloader 2026-05-23 12:57:07 -03:00
Rafael Moraes 92b8220c71 Add ffmpeg path option to downloader 2026-05-23 12:56:54 -03:00
21 changed files with 907 additions and 382 deletions
+7 -4
View File
@@ -58,6 +58,8 @@ Use [N_m3u8DL-RE](https://github.com/nilaoda/N_m3u8DL-RE/releases/latest) as a f
If the executable is not available in your system PATH, set its location with `--nm3u8dlre-path` or `nm3u8dlre_path`.
N_m3u8DL-RE also needs FFmpeg. If the FFmpeg executable is not available in your system PATH, set its location with `--ffmpeg-path` or `ffmpeg_path`.
## 📦 Installation
1. **Install Gamdl via pip:**
@@ -81,10 +83,10 @@ gamdl [OPTIONS] URLS...
### Supported URL Types
- Songs
- Albums (Public/Library)
- Playlists (Public/Library)
- Music Videos
- Songs (Catalog/Library)
- Albums (Catalog/Library)
- Playlists (Catalog/Library)
- Music Videos (Catalog/Library)
- Artists
- Post Videos
- Apple Music Classical
@@ -167,6 +169,7 @@ The file is created automatically on first run. Command-line arguments override
| `--output-path`, `-o` | Output directory path | `./Apple Music` |
| `--temp-path` | Temporary directory path | `.` |
| `--nm3u8dlre-path` | N_m3u8DL-RE executable path | `N_m3u8DL-RE` |
| `--ffmpeg-path` | FFmpeg executable path | `ffmpeg` |
| `--download-mode` | Download mode | `ytdlp` |
| **Template Options** | | |
| `--album-folder-template` | Album folder template | `{album_artist}/{album}` |
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.6"
__version__ = "3.7.4"
+154 -7
View File
@@ -15,10 +15,16 @@ from .constants import (
APPLE_MUSIC_HOMEPAGE_URL,
APPLE_MUSIC_LIBRARY_ALBUM_API_URI,
APPLE_MUSIC_LIBRARY_PLAYLIST_API_URI,
APPLE_MUSIC_LIBRARY_PLAYLISTS_API_URI,
APPLE_MUSIC_LICENSE_API_URL,
APPLE_MUSIC_LIBRARY_MUSIC_VIDEO_API_URI,
APPLE_MUSIC_MUSIC_VIDEO_API_URI,
APPLE_MUSIC_LIBRARY_ALBUMS_API_URI,
APPLE_MUSIC_PLAYLIST_API_URI,
APPLE_MUSIC_SEARCH_API_URI,
APPLE_MUSIC_LIBRARY_MUSIC_VIDEOS_API_URI,
APPLE_MUSIC_LIBRARY_SONG_API_URI,
APPLE_MUSIC_LIBRARY_SONGS_API_URI,
APPLE_MUSIC_SONG_API_URI,
APPLE_MUSIC_UPLOADED_VIDEO_API_URL,
APPLE_MUSIC_WEBPLAYBACK_API_URL,
@@ -87,7 +93,7 @@ class AppleMusicApi:
)
index_js_uri_match = re.search(
r"/(assets/index-legacy[~-][^/\"]+\.js)",
r"/(assets/index[~-][^/\"]+\.js)",
home_page,
)
if not index_js_uri_match:
@@ -110,7 +116,7 @@ class AppleMusicApi:
status_code=response.status_code if response is not None else None,
)
token_match = re.search('(?=eyJh)(.*?)(?=")', index_js_page)
token_match = re.search(r'"(eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+)"', index_js_page)
if not token_match:
raise GamdlApiResponseError("Error finding token in index.js page")
token = token_match.group(1)
@@ -426,9 +432,54 @@ class AppleMusicApi:
return artist
async def get_library_song(
self,
song_id: str,
include: str = "catalog",
extend: str = "extendedAssetUrls",
) -> dict:
log = logger.bind(action="get_library_song", song_id=song_id)
song = await self._amp_request(
APPLE_MUSIC_LIBRARY_SONG_API_URI.format(
song_id=song_id,
),
{
"include": include,
"extend": extend,
},
)
log.debug("success", song=song)
return song
async def get_library_music_video(
self,
music_video_id: str,
include: str = "catalog",
) -> dict:
log = logger.bind(
action="get_library_music_video", music_video_id=music_video_id
)
music_video = await self._amp_request(
APPLE_MUSIC_LIBRARY_MUSIC_VIDEO_API_URI.format(
music_video_id=music_video_id,
),
{
"include": include,
},
)
log.debug("success", music_video=music_video)
return music_video
async def get_library_album(
self,
album_id: str,
include: str = "catalog",
extend: str = "extendedAssetUrls",
) -> dict:
log = logger.bind(action="get_library_album", album_id=album_id)
@@ -438,6 +489,7 @@ class AppleMusicApi:
album_id=album_id,
),
{
"include": include,
"extend": extend,
},
)
@@ -449,7 +501,7 @@ class AppleMusicApi:
async def get_library_playlist(
self,
playlist_id: str,
include: str = "tracks",
include: str = "catalog,tracks",
limit: int = 100,
extend: str = "extendedAssetUrls",
) -> dict:
@@ -470,6 +522,92 @@ class AppleMusicApi:
return playlist
async def get_library_songs(
self,
limit: int = 100,
offset: int = 0,
include: str = "catalog",
extend: str = "extendedAssetUrls",
) -> dict:
log = logger.bind(action="get_library_songs")
library_songs = await self._amp_request(
APPLE_MUSIC_LIBRARY_SONGS_API_URI,
{
"limit": limit,
"offset": offset,
"include": include,
"extend": extend,
},
)
log.debug("success", library_songs=library_songs)
return library_songs
async def get_library_music_videos(
self,
limit: int = 100,
offset: int = 0,
include: str = "catalog",
) -> dict:
log = logger.bind(action="get_library_music_videos")
library_music_videos = await self._amp_request(
APPLE_MUSIC_LIBRARY_MUSIC_VIDEOS_API_URI,
{
"limit": limit,
"offset": offset,
"include": include,
},
)
log.debug("success", library_music_videos=library_music_videos)
return library_music_videos
async def get_library_albums(
self,
limit: int = 100,
offset: int = 0,
include: str = "catalog",
) -> dict:
log = logger.bind(action="get_library_albums")
library_albums = await self._amp_request(
APPLE_MUSIC_LIBRARY_ALBUMS_API_URI,
{
"limit": limit,
"offset": offset,
"include": include,
},
)
log.debug("success", library_albums=library_albums)
return library_albums
async def get_library_playlists(
self,
limit: int = 100,
offset: int = 0,
include: str = "catalog",
) -> dict:
log = logger.bind(action="get_library_playlists")
library_playlists = await self._amp_request(
APPLE_MUSIC_LIBRARY_PLAYLISTS_API_URI,
{
"limit": limit,
"offset": offset,
"include": include,
},
)
log.debug("success", library_playlists=library_playlists)
return library_playlists
async def get_search_results(
self,
term: str,
@@ -531,17 +669,26 @@ class AppleMusicApi:
async def get_webplayback(
self,
track_id: str,
is_library: bool = False,
) -> dict:
log = logger.bind(action="get_webplayback", track_id=track_id)
response = None
if is_library:
request_body = {
"universalLibraryId": track_id,
}
else:
request_body = {
"salableAdamId": track_id,
}
request_body["language"] = self.language
try:
response = await self.client.post(
APPLE_MUSIC_WEBPLAYBACK_API_URL,
json={
"salableAdamId": track_id,
"language": self.language,
},
json=request_body,
)
response.raise_for_status()
webplayback = response.json()
+8
View File
@@ -14,9 +14,17 @@ APPLE_MUSIC_UPLOADED_VIDEO_API_URL = (
APPLE_MUSIC_ALBUM_API_URI = "/v1/catalog/{storefront}/albums/{album_id}"
APPLE_MUSIC_PLAYLIST_API_URI = "/v1/catalog/{storefront}/playlists/{playlist_id}"
APPLE_MUSIC_ARTIST_API_URI = "/v1/catalog/{storefront}/artists/{artist_id}"
APPLE_MUSIC_LIBRARY_SONG_API_URI = "/v1/me/library/songs/{song_id}"
APPLE_MUSIC_LIBRARY_MUSIC_VIDEO_API_URI = (
"/v1/me/library/music-videos/{music_video_id}"
)
APPLE_MUSIC_LIBRARY_ALBUM_API_URI = "/v1/me/library/albums/{album_id}"
APPLE_MUSIC_LIBRARY_PLAYLIST_API_URI = "/v1/me/library/playlists/{playlist_id}"
APPLE_MUSIC_SEARCH_API_URI = "/v1/catalog/{storefront}/search"
APPLE_MUSIC_LIBRARY_SONGS_API_URI = "/v1/me/library/songs"
APPLE_MUSIC_LIBRARY_MUSIC_VIDEOS_API_URI = "/v1/me/library/music-videos"
APPLE_MUSIC_LIBRARY_ALBUMS_API_URI = "/v1/me/library/albums"
APPLE_MUSIC_LIBRARY_PLAYLISTS_API_URI = "/v1/me/library/playlists"
APPLE_MUSIC_WEBPLAYBACK_API_URL = (
"https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
+15 -3
View File
@@ -1,3 +1,6 @@
import json
from typing import Any
from ..utils import GamdlError
@@ -9,7 +12,7 @@ class GamdlApiResponseError(GamdlApiError):
def __init__(
self,
message: str,
content: str | None = None,
content: Any | None = None,
status_code: int | None = None,
):
self.message = message
@@ -19,7 +22,16 @@ class GamdlApiResponseError(GamdlApiError):
if status_code is not None:
message = f"{message} (Status code: {status_code})"
if content:
message += f": {content}"
if content is not None:
if isinstance(content, str):
content_text = content
else:
try:
content_text = json.dumps(content)
except TypeError:
content_text = str(content)
if content_text:
message += f": {content_text}"
super().__init__(message)
+1
View File
@@ -176,6 +176,7 @@ async def main(config: CliConfig):
output_path=config.output_path,
temp_path=config.temp_path,
nm3u8dlre_path=config.nm3u8dlre_path,
ffmpeg_path=config.ffmpeg_path,
download_mode=config.download_mode,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
+8
View File
@@ -313,6 +313,14 @@ class CliConfig:
default=base_downloader_sig.parameters["nm3u8dlre_path"].default,
),
]
ffmpeg_path: Annotated[
str,
option(
"--ffmpeg-path",
help="FFmpeg executable path",
default=base_downloader_sig.parameters["ffmpeg_path"].default,
),
]
download_mode: Annotated[
DownloadMode,
option(
+309 -138
View File
@@ -128,6 +128,40 @@ def _sample_size(sample: SampleInfo) -> int:
return sample.size or len(sample.data)
def _sample_data(sample: SampleInfo) -> bytes:
"""Return sample payload bytes, loading file-backed samples on demand."""
if sample.data:
return sample.data
if sample.data_path and sample.size:
with open(sample.data_path, "rb") as f:
f.seek(sample.data_offset)
data = f.read(sample.size)
if len(data) != sample.size:
raise IOError(
f"unexpected EOF while reading sample at {sample.data_offset} "
f"from {sample.data_path}"
)
return data
return sample.data
def _with_sample_data(sample: SampleInfo, data: bytes) -> SampleInfo:
"""Return a copy of a sample with materialized payload bytes."""
return SampleInfo(
data=data,
duration=sample.duration,
desc_index=sample.desc_index,
iv=sample.iv,
subsamples=sample.subsamples,
composition_time_offset=sample.composition_time_offset,
sample_flags=sample.sample_flags,
is_sync=sample.is_sync,
size=sample.size or len(data),
data_path=sample.data_path,
data_offset=sample.data_offset,
)
def _decrypt_cbcs_sample_with_key(
sample: SampleInfo, key: bytes, enc_info: EncryptionInfo
) -> bytes:
@@ -262,6 +296,8 @@ class SampleInfo:
sample_flags: int = 0
is_sync: bool = True
size: int = 0
data_path: Optional[str] = None
data_offset: int = 0
@dataclass
@@ -361,7 +397,11 @@ def find_box(data: bytes, box_path: List[str]) -> Optional[bytes]:
return f.read()
def extract_song(input_path: str, handler_type: bytes = b"soun") -> SongInfo:
def extract_song(
input_path: str,
handler_type: bytes = b"soun",
file_backed_samples: bool = False,
) -> SongInfo:
"""
Extract media samples and metadata from encrypted MP4 file.
@@ -370,38 +410,78 @@ def extract_song(input_path: str, handler_type: bytes = b"soun") -> SongInfo:
- Individual audio samples from mdat boxes
- Sample durations and description indices from moof boxes
"""
with open(input_path, "rb") as f:
raw_data = f.read()
song_info = SongInfo(handler_type=handler_type)
# First pass: collect all top-level boxes
boxes = []
offset = 0
while offset < len(raw_data) - 8:
size = struct.unpack(">I", raw_data[offset : offset + 4])[0]
box_type = raw_data[offset + 4 : offset + 8].decode("ascii", errors="replace")
if file_backed_samples:
file_size = os.path.getsize(input_path)
with open(input_path, "rb") as f:
offset = 0
while offset + 8 <= file_size:
f.seek(offset)
header = f.read(8)
if len(header) < 8:
break
size = struct.unpack(">I", header[:4])[0]
box_type = header[4:8].decode("ascii", errors="replace")
header_size = 8
if size == 0:
size = file_size - offset
elif size == 1:
ext_size = f.read(8)
if len(ext_size) < 8:
break
size = struct.unpack(">Q", ext_size)[0]
header_size = 16
if size < header_size or offset + size > file_size:
break
header_size = 8
if size == 0:
break
if size == 1:
# Extended size
if offset + 16 > len(raw_data):
data = b""
if box_type in ("ftyp", "moov", "moof"):
f.seek(offset)
data = f.read(size)
boxes.append(
{
"offset": offset,
"size": size,
"type": box_type,
"header_size": header_size,
"data": data,
}
)
offset += size
else:
with open(input_path, "rb") as f:
raw_data = f.read()
offset = 0
while offset < len(raw_data) - 8:
size = struct.unpack(">I", raw_data[offset : offset + 4])[0]
box_type = raw_data[offset + 4 : offset + 8].decode(
"ascii", errors="replace"
)
header_size = 8
if size == 0:
break
size = struct.unpack(">Q", raw_data[offset + 8 : offset + 16])[0]
header_size = 16
if size == 1:
# Extended size
if offset + 16 > len(raw_data):
break
size = struct.unpack(">Q", raw_data[offset + 8 : offset + 16])[0]
header_size = 16
boxes.append(
{
"offset": offset,
"size": size,
"type": box_type,
"header_size": header_size,
"data": raw_data[offset : offset + size],
}
)
offset += size
boxes.append(
{
"offset": offset,
"size": size,
"type": box_type,
"header_size": header_size,
"data": raw_data[offset : offset + size],
}
)
offset += size
# Extract ftyp and moov
for box in boxes:
@@ -457,7 +537,12 @@ def extract_song(input_path: str, handler_type: bytes = b"soun") -> SongInfo:
elif box["type"] == "mdat" and moof_box is not None:
# Parse this moof/mdat pair
moof_data = moof_box["data"]
mdat_data = box["data"][box["header_size"] :] # Skip mdat header
if file_backed_samples:
mdat_data = b""
mdat_data_size = box["size"] - box["header_size"]
else:
mdat_data = box["data"][box["header_size"] :] # Skip mdat header
mdat_data_size = len(mdat_data)
# Parse moof for tfhd (sample description index, defaults) and trun (entries)
_iv_size = (
@@ -475,6 +560,8 @@ def extract_song(input_path: str, handler_type: bytes = b"soun") -> SongInfo:
moof_offset=moof_box["offset"],
mdat_data_offset=box["offset"] + box["header_size"],
per_sample_iv_size=_iv_size,
mdat_data_size=mdat_data_size,
mdat_source_path=input_path if file_backed_samples else None,
)
song_info.samples.extend(samples_from_pair)
moof_box = None
@@ -507,6 +594,8 @@ def _parse_moof_mdat(
moof_offset: int = 0,
mdat_data_offset: int = 0,
per_sample_iv_size: int = 0,
mdat_data_size: Optional[int] = None,
mdat_source_path: Optional[str] = None,
) -> List[SampleInfo]:
"""Parse a moof box and extract samples from corresponding mdat.
@@ -520,6 +609,7 @@ def _parse_moof_mdat(
per_sample_iv_size: IV size per sample from tenc (0, 8, or 16).
"""
samples = []
available_mdat_bytes = len(mdat_data) if mdat_data_size is None else mdat_data_size
# Simple box parsing inside moof
offset = 8 # Skip moof header
@@ -624,8 +714,9 @@ def _parse_moof_mdat(
"sample_flags", tfhd_info["default_sample_flags"]
)
if sample_size > 0 and mdat_read_offset + sample_size <= len(
mdat_data
if (
sample_size > 0
and mdat_read_offset + sample_size <= available_mdat_bytes
):
sample_iv = b""
sample_subsamples: List[tuple] = []
@@ -634,11 +725,17 @@ def _parse_moof_mdat(
sample_subsamples = senc_entries[sample_index_in_traf][
"subsamples"
]
if mdat_source_path:
sample_data = b""
sample_data_offset = mdat_data_offset + mdat_read_offset
else:
sample_data = mdat_data[
mdat_read_offset : mdat_read_offset + sample_size
]
sample_data_offset = 0
sample = SampleInfo(
data=mdat_data[
mdat_read_offset : mdat_read_offset + sample_size
],
data=sample_data,
duration=sample_duration,
desc_index=desc_index,
iv=sample_iv,
@@ -649,6 +746,8 @@ def _parse_moof_mdat(
sample_flags=sample_flags,
is_sync=not bool(sample_flags & 0x10000),
size=sample_size,
data_path=mdat_source_path,
data_offset=sample_data_offset,
)
samples.append(sample)
mdat_read_offset += sample_size
@@ -893,6 +992,7 @@ async def decrypt_samples(
*,
use_single_content_key: bool = False,
progress_callback=None,
decrypted_data_path: Optional[str] = None,
) -> bytes:
"""
Send track-key samples to wrapper-v2 (HTTP POST /decrypt) for CBCS
@@ -916,6 +1016,8 @@ async def decrypt_samples(
"""
keys = [fairplay_key] if use_single_content_key else [PREFETCH_KEY, fairplay_key]
decrypted_data = bytearray()
decrypted_output = open(decrypted_data_path, "wb") if decrypted_data_path else None
decrypted_bytes = 0
last_desc_index: int = 255
total_samples = len(samples)
bytes_processed = 0
@@ -927,6 +1029,14 @@ async def decrypt_samples(
# Pending (sample, aligned_cbc, tail) for one SKD segment, flushed in batches.
crypto_batch: List[tuple] = []
def emit(data: bytes) -> None:
nonlocal decrypted_bytes
if decrypted_output:
decrypted_output.write(data)
else:
decrypted_data.extend(data)
decrypted_bytes += len(data)
async def flush_crypto_batch() -> None:
if not crypto_batch:
return
@@ -939,75 +1049,96 @@ async def decrypt_samples(
if len(plains) != len(chunks):
raise IOError("wrapper-v2: plaintext batch count mismatch")
for s, plain, tail in zip(sources, plains, tails):
_append_reassembled_sample(decrypted_data, s, plain, tail)
emit(_reassemble_cbcs_sample(s, plain, tail))
crypto_batch.clear()
for i, sample in enumerate(samples):
if last_desc_index != sample.desc_index:
await flush_crypto_batch()
if use_single_content_key:
segment_adam = track_id
segment_uri = fairplay_key
else:
key_uri = keys[min(sample.desc_index, len(keys) - 1)]
segment_adam = "0" if key_uri == PREFETCH_KEY else track_id
segment_uri = key_uri
last_desc_index = sample.desc_index
try:
for i, original_sample in enumerate(samples):
sample = (
_with_sample_data(original_sample, _sample_data(original_sample))
if not original_sample.data and original_sample.data_path
else original_sample
)
if last_desc_index != sample.desc_index:
await flush_crypto_batch()
if use_single_content_key:
segment_adam = track_id
segment_uri = fairplay_key
else:
key_uri = keys[min(sample.desc_index, len(keys) - 1)]
segment_adam = "0" if key_uri == PREFETCH_KEY else track_id
segment_uri = key_uri
last_desc_index = sample.desc_index
if not use_single_content_key and segment_adam == "0":
await flush_crypto_batch()
enc_info = (
encryption_info_per_desc.get(sample.desc_index)
if encryption_info_per_desc
and sample.desc_index in encryption_info_per_desc
else encryption_info
)
emit(
_decrypt_cbcs_sample_with_key(
sample, DEFAULT_SONG_DECRYPTION_KEY, enc_info
)
)
bytes_processed += _sample_size(sample)
now = time.time()
if progress_callback and (
i % 50 == 0
or now - last_progress_time > 0.5
or i == total_samples - 1
):
elapsed = now - start_time
speed = bytes_processed / elapsed if elapsed > 0 else 0
progress_callback(i + 1, total_samples, bytes_processed, speed)
last_progress_time = now
continue
if not use_single_content_key and segment_adam == "0":
await flush_crypto_batch()
enc_info = (
encryption_info_per_desc.get(sample.desc_index)
if encryption_info_per_desc
and sample.desc_index in encryption_info_per_desc
else encryption_info
)
decrypted_data.extend(
_decrypt_cbcs_sample_with_key(
sample, DEFAULT_SONG_DECRYPTION_KEY, enc_info
if enc_info.crypt_byte_block and enc_info.skip_byte_block:
raise IOError(
"wrapper-v2 pattern CBCS decrypt is not supported by gamdl's "
"batch decrypt path; use hex-key decrypt for this track"
)
)
bytes_processed += len(sample.data)
parts = _cbcs_ciphertext_for_sample(sample)
if parts is None:
await flush_crypto_batch()
emit(sample.data)
else:
aligned, tail = parts
if len(aligned) == 0:
await flush_crypto_batch()
emit(_reassemble_cbcs_sample(sample, b"", tail))
else:
crypto_batch.append((sample, aligned, tail))
if len(crypto_batch) >= WRAPPER_DECRYPT_BATCH_SIZE:
await flush_crypto_batch()
bytes_processed += _sample_size(sample)
now = time.time()
if progress_callback and (
i % 50 == 0
or now - last_progress_time > 0.5
or i == total_samples - 1
i % 50 == 0 or now - last_progress_time > 0.5 or i == total_samples - 1
):
elapsed = now - start_time
speed = bytes_processed / elapsed if elapsed > 0 else 0
progress_callback(i + 1, total_samples, bytes_processed, speed)
last_progress_time = now
continue
parts = _cbcs_ciphertext_for_sample(sample)
if parts is None:
await flush_crypto_batch()
decrypted_data.extend(sample.data)
else:
aligned, tail = parts
if len(aligned) == 0:
await flush_crypto_batch()
_append_reassembled_sample(decrypted_data, sample, b"", tail)
else:
crypto_batch.append((sample, aligned, tail))
if len(crypto_batch) >= WRAPPER_DECRYPT_BATCH_SIZE:
await flush_crypto_batch()
await flush_crypto_batch()
finally:
if decrypted_output:
decrypted_output.close()
bytes_processed += len(sample.data)
now = time.time()
if progress_callback and (
i % 50 == 0 or now - last_progress_time > 0.5 or i == total_samples - 1
):
elapsed = now - start_time
speed = bytes_processed / elapsed if elapsed > 0 else 0
progress_callback(i + 1, total_samples, bytes_processed, speed)
last_progress_time = now
await flush_crypto_batch()
logger.debug(f"Decrypted {len(samples)} samples ({len(decrypted_data)} bytes)")
logger.debug(f"Decrypted {len(samples)} samples ({decrypted_bytes} bytes)")
return bytes(decrypted_data)
@@ -1043,11 +1174,11 @@ def write_decrypted_m4a(
timescale = 44100 # Default fallback
preferred_desc_index = _preferred_sample_description_index(song_info.samples)
if original_path:
if song_info.moov_data:
orig_data = song_info.ftyp_data + song_info.moov_data
elif original_path:
with open(original_path, "rb") as f:
orig_data = f.read()
elif song_info.moov_data:
orig_data = song_info.ftyp_data + song_info.moov_data
else:
orig_data = None
@@ -1133,11 +1264,11 @@ def write_decrypted_mp4_track(
timescale = 44100 if track_info.handler_type == b"soun" else 90000
preferred_desc_index = _preferred_sample_description_index(track_info.samples)
if original_path:
if track_info.moov_data:
orig_data = track_info.ftyp_data + track_info.moov_data
elif original_path:
with open(original_path, "rb") as f:
orig_data = f.read()
elif track_info.moov_data:
orig_data = track_info.ftyp_data + track_info.moov_data
else:
orig_data = None
@@ -1228,11 +1359,11 @@ def _build_decrypted_track_moov(
timescale = 44100 if track_info.handler_type == b"soun" else 90000
preferred_desc_index = _preferred_sample_description_index(track_info.samples)
if original_path:
if track_info.moov_data:
orig_data = track_info.ftyp_data + track_info.moov_data
elif original_path:
with open(original_path, "rb") as f:
orig_data = f.read()
elif track_info.moov_data:
orig_data = track_info.ftyp_data + track_info.moov_data
else:
orig_data = None
@@ -1301,6 +1432,11 @@ def _decrypted_track_payload_source(track: DecryptedTrack):
return (None, 0, len(track.data), track.data)
def _sample_payload_bytes(samples: List[SampleInfo]) -> bytes:
"""Materialize only the payload bytes for the given samples."""
return b"".join(_sample_data(sample) for sample in samples)
def mux_decrypted_media_direct(
decrypted_media: DecryptedMedia,
output_path: str,
@@ -1310,17 +1446,11 @@ def mux_decrypted_media_direct(
if decrypted_media.video is None:
raise ValueError("direct AV mux requires a video track")
video_moov = _build_decrypted_track_moov(
decrypted_media.video.track_info,
decrypted_media.video.input_path,
)
audio_moov = _build_decrypted_track_moov(
decrypted_media.audio.track_info,
decrypted_media.audio.input_path,
)
video_moov = _build_decrypted_track_moov(decrypted_media.video.track_info)
audio_moov = _build_decrypted_track_moov(decrypted_media.audio.track_info)
extra_track_files = [
(
_build_decrypted_track_moov(caption.track_info, caption.input_path),
_build_decrypted_track_moov(caption.track_info),
_decrypted_track_payload_source(caption),
)
for caption in decrypted_media.captions
@@ -1497,7 +1627,9 @@ async def _decrypt_track_hex(
``True`` (web AAC, muxed MV audio): every sample description uses
``decryption_key``.
"""
track_info = await asyncio.to_thread(extract_song, input_path, handler_type)
track_info = await asyncio.to_thread(
extract_song, input_path, handler_type, file_backed
)
track_key = bytes.fromhex(decryption_key)
if use_single_content_key:
@@ -1581,21 +1713,27 @@ async def decrypt_file_hex(
video_key = decryption_key_video or decryption_key_audio
video_task = asyncio.create_task(
_decrypt_track_hex(input_video_path, video_key, b"vide", file_backed=True)
_decrypt_track_hex(
input_video_path,
video_key,
b"vide",
use_cenc=use_cenc,
file_backed=True,
)
)
caption_tracks = [
track
for track in await asyncio.gather(
asyncio.to_thread(extract_song, input_video_path, b"clcp"),
asyncio.to_thread(extract_song, input_video_path, b"text"),
asyncio.to_thread(extract_song, input_video_path, b"sbtl"),
asyncio.to_thread(extract_song, input_video_path, b"subt"),
asyncio.to_thread(extract_song, input_video_path, b"clcp", True),
asyncio.to_thread(extract_song, input_video_path, b"text", True),
asyncio.to_thread(extract_song, input_video_path, b"sbtl", True),
asyncio.to_thread(extract_song, input_video_path, b"subt", True),
)
if track.samples
]
captions = []
for caption_track in caption_tracks:
caption_data = b"".join(sample.data for sample in caption_track.samples)
caption_data = _sample_payload_bytes(caption_track.samples)
if caption_track.encryption_info:
caption_key = bytes.fromhex(video_key)
caption_enc_info_per_desc = await asyncio.to_thread(
@@ -3106,10 +3244,13 @@ async def _decrypt_track_wrapper(
handler_type: bytes = b"soun",
*,
use_single_content_key: bool = False,
file_backed: bool = False,
progress_callback=None,
) -> DecryptedTrack:
"""Decrypt one track through wrapper-v2 (CBCS via FairPlay SKD)."""
song_info = await asyncio.to_thread(extract_song, input_path, handler_type)
song_info = await asyncio.to_thread(
extract_song, input_path, handler_type, file_backed
)
enc_info = song_info.encryption_info or EncryptionInfo(scheme_type="cbcs")
enc_info_per_desc = None
if song_info.moov_data:
@@ -3119,16 +3260,39 @@ async def _decrypt_track_wrapper(
handler_type,
)
decrypted_data = await decrypt_samples(
wrapper_api,
track_id,
fairplay_key,
song_info.samples,
enc_info,
enc_info_per_desc,
use_single_content_key=use_single_content_key,
progress_callback=progress_callback,
)
temp_path = None
if file_backed:
temp_file = tempfile.NamedTemporaryFile(
prefix="gamdl_decrypted_", suffix=".bin", delete=False
)
temp_path = temp_file.name
temp_file.close()
try:
decrypted_data = await decrypt_samples(
wrapper_api,
track_id,
fairplay_key,
song_info.samples,
enc_info,
enc_info_per_desc,
use_single_content_key=use_single_content_key,
progress_callback=progress_callback,
decrypted_data_path=temp_path,
)
except Exception:
if temp_path:
try:
os.remove(temp_path)
except FileNotFoundError:
pass
raise
if temp_path:
return DecryptedTrack(
input_path,
song_info,
data_path=temp_path,
data_size=os.path.getsize(temp_path),
)
return DecryptedTrack(input_path, song_info, decrypted_data)
@@ -3173,16 +3337,17 @@ async def decrypt_wrapper(
input_video_path,
b"vide",
use_single_content_key=use_single_content_key,
file_backed=True,
progress_callback=progress_callback,
)
)
caption_tracks = [
track
for track in await asyncio.gather(
asyncio.to_thread(extract_song, input_video_path, b"clcp"),
asyncio.to_thread(extract_song, input_video_path, b"text"),
asyncio.to_thread(extract_song, input_video_path, b"sbtl"),
asyncio.to_thread(extract_song, input_video_path, b"subt"),
asyncio.to_thread(extract_song, input_video_path, b"clcp", True),
asyncio.to_thread(extract_song, input_video_path, b"text", True),
asyncio.to_thread(extract_song, input_video_path, b"sbtl", True),
asyncio.to_thread(extract_song, input_video_path, b"subt", True),
)
if track.samples
]
@@ -3190,7 +3355,7 @@ async def decrypt_wrapper(
DecryptedTrack(
input_video_path,
caption_track,
b"".join(sample.data for sample in caption_track.samples),
_sample_payload_bytes(caption_track.samples),
)
for caption_track in caption_tracks
]
@@ -3225,14 +3390,13 @@ def decrypt_samples_hex(
Returns:
Concatenated decrypted sample data.
"""
is_cenc = encryption_info.scheme_type == "cenc"
decrypted = bytearray()
for sample in samples:
key = keys.get(sample.desc_index)
if key is None:
# No key for this desc_index — keep data as-is (shouldn't happen)
decrypted.extend(sample.data)
decrypted.extend(_sample_data(sample))
continue
# Get encryption info for this sample's desc_index (if per-description info exists)
@@ -3241,8 +3405,13 @@ def decrypt_samples_hex(
else:
enc_info = encryption_info
if not sample.data and sample.data_path:
sample = _with_sample_data(sample, _sample_data(sample))
is_cenc = enc_info.scheme_type == "cenc"
if is_cenc:
# AES-128-CTR: per-sample IV from senc, zero-padded to 16 bytes
data = sample.data
iv = sample.iv
if len(iv) < 16:
iv = iv + b"\x00" * (16 - len(iv))
@@ -3252,16 +3421,16 @@ def decrypt_samples_hex(
plaintext = bytearray()
offset = 0
for clear_bytes, encrypted_bytes in sample.subsamples:
plaintext.extend(sample.data[offset : offset + clear_bytes])
plaintext.extend(data[offset : offset + clear_bytes])
offset += clear_bytes
plaintext.extend(
cipher.decrypt(sample.data[offset : offset + encrypted_bytes])
cipher.decrypt(data[offset : offset + encrypted_bytes])
)
offset += encrypted_bytes
plaintext.extend(sample.data[offset:])
plaintext.extend(data[offset:])
decrypted.extend(plaintext)
else:
decrypted.extend(cipher.decrypt(sample.data))
decrypted.extend(cipher.decrypt(data))
else:
# CBCS (AES-128-CBC): constant IV or per-sample IV
@@ -3294,12 +3463,16 @@ def _decrypt_sample_hex(
sample: SampleInfo,
key: Optional[bytes],
encryption_info: EncryptionInfo,
is_cenc: bool,
) -> bytes:
"""Decrypt one sample with a raw AES key."""
if key is None:
return sample.data
data = _sample_data(sample)
if data is not sample.data:
sample = _with_sample_data(sample, data)
if key is None:
return data
is_cenc = encryption_info.scheme_type == "cenc"
if is_cenc:
iv = sample.iv
if len(iv) < 16:
@@ -3307,18 +3480,18 @@ def _decrypt_sample_hex(
cipher = AES.new(key, AES.MODE_CTR, nonce=b"", initial_value=iv)
if not sample.subsamples:
return cipher.decrypt(sample.data)
return cipher.decrypt(data)
plaintext = bytearray()
offset = 0
for clear_bytes, encrypted_bytes in sample.subsamples:
plaintext.extend(sample.data[offset : offset + clear_bytes])
plaintext.extend(data[offset : offset + clear_bytes])
offset += clear_bytes
plaintext.extend(
cipher.decrypt(sample.data[offset : offset + encrypted_bytes])
cipher.decrypt(data[offset : offset + encrypted_bytes])
)
offset += encrypted_bytes
plaintext.extend(sample.data[offset:])
plaintext.extend(data[offset:])
return bytes(plaintext)
if encryption_info.crypt_byte_block and encryption_info.skip_byte_block:
@@ -3349,7 +3522,6 @@ def decrypt_samples_hex_to_file(
release_sample_data: bool = False,
) -> int:
"""Decrypt samples to a raw payload file without building one large bytes object."""
is_cenc = encryption_info.scheme_type == "cenc"
bytes_written = 0
with open(output_path, "wb") as f:
for sample in samples:
@@ -3363,7 +3535,6 @@ def decrypt_samples_hex_to_file(
sample,
keys.get(sample.desc_index),
enc_info,
is_cenc,
)
f.write(decrypted_sample)
sample.size = len(decrypted_sample)
+55 -11
View File
@@ -9,6 +9,8 @@ from pathlib import Path
import structlog
from mutagen.mp4 import MP4, MP4Cover
from yt_dlp import YoutubeDL
from yt_dlp.downloader.hls import HlsFD
from yt_dlp.downloader.http import HttpFD
from ..interface.enums import CoverFormat
from ..interface.interface import AppleMusicInterface
@@ -27,19 +29,40 @@ def _download_ytdlp_process(
result_queue,
) -> None:
try:
Path(download_path).parent.mkdir(parents=True, exist_ok=True)
with YoutubeDL(
{
"quiet": True,
"no_warnings": True,
"outtmpl": download_path,
"allow_unplayable_formats": True,
"overwrites": True,
"fixup": "never",
"noprogress": silent,
"allowed_extractors": ["generic"],
"allow_unplayable_formats": True,
"concurrent_fragment_downloads": 8,
}
) as ydl:
ydl.download(stream_url)
if stream_url.split("?")[0].endswith(".m3u8"):
hls_downloader = HlsFD(ydl, ydl.params)
success, _ = hls_downloader.download(
download_path,
{
"url": stream_url,
"ext": "mp4",
"protocol": "m3u8",
},
)
if not success:
raise RuntimeError("yt-dlp HLS download failed")
else:
http_downloader = HttpFD(ydl, ydl.params)
success, _ = http_downloader.download(
download_path,
{
"url": stream_url,
},
)
if not success:
raise RuntimeError("yt-dlp HTTP download failed")
except Exception as e:
result_queue.put(("error", repr(e), traceback.format_exc()))
@@ -51,6 +74,7 @@ class AppleMusicBaseDownloader:
output_path: str = "./Apple Music",
temp_path: str = ".",
nm3u8dlre_path: str = "N_m3u8DL-RE",
ffmpeg_path: str = "ffmpeg",
download_mode: DownloadMode = DownloadMode.YTDLP,
album_folder_template: str = "{album_artist}/{album}",
compilation_folder_template: str = "Compilations/{album}",
@@ -69,6 +93,7 @@ class AppleMusicBaseDownloader:
self.output_path = output_path
self.temp_path = temp_path
self.nm3u8dlre_path = nm3u8dlre_path
self.ffmpeg_path = ffmpeg_path
self.download_mode = download_mode
self.album_folder_template = album_folder_template
self.compilation_folder_template = compilation_folder_template
@@ -89,10 +114,12 @@ class AppleMusicBaseDownloader:
log = logger.bind(action="initialize_binary_paths")
self.full_nm3u8dlre_path = shutil.which(self.nm3u8dlre_path)
self.full_ffmpeg_path = shutil.which(self.ffmpeg_path)
log = log.debug(
"success",
full_nm3u8dlre_path=self.full_nm3u8dlre_path,
full_ffmpeg_path=self.full_ffmpeg_path,
)
def get_temp_path(
@@ -213,20 +240,36 @@ class AppleMusicBaseDownloader:
return final_path
async def download_stream(self, stream_url: str, download_path: str):
async def download_stream(
self,
stream_url: str,
download_path: str,
):
log = logger.bind(
action="download_stream", stream_url=stream_url, download_path=download_path
)
if self.download_mode == DownloadMode.YTDLP:
await self._download_ytdlp_async(stream_url, download_path)
stream_url_stripped = stream_url.split("?")[0]
if self.download_mode == DownloadMode.NM3U8DLRE:
if (
self.download_mode == DownloadMode.YTDLP
or not stream_url_stripped.endswith(".m3u8")
):
await self._download_ytdlp_async(
stream_url,
download_path,
)
elif self.download_mode == DownloadMode.NM3U8DLRE:
await self._download_nm3u8dlre(stream_url, download_path)
log.debug("success")
async def _download_ytdlp_async(self, stream_url: str, download_path: str) -> None:
async def _download_ytdlp_async(
self,
stream_url: str,
download_path: str,
) -> None:
ctx = multiprocessing.get_context()
result_queue = ctx.Queue()
process = ctx.Process(
@@ -273,6 +316,8 @@ class AppleMusicBaseDownloader:
"--no-log",
"--log-level",
"off",
"--ffmpeg-binary-path",
self.full_ffmpeg_path,
"--save-name",
download_path_obj.stem,
"--save-dir",
@@ -321,7 +366,6 @@ class AppleMusicBaseDownloader:
skip_tagging: bool,
):
mp4 = MP4(media_path)
mp4.clear()
if not skip_tagging:
if cover_bytes is not None:
+25 -19
View File
@@ -159,26 +159,32 @@ class AppleMusicSongDownloader:
self,
download_item: DownloadItem,
) -> None:
encrypted_path = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path,
)
if download_item.media.stream_info.audio_track.drm_free:
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
download_item.staged_path,
)
else:
encrypted_path = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.media.media_id,
download_item.media.decryption_key,
download_item.media.stream_info.audio_track.fairplay_key,
download_item.media.stream_info.audio_track.use_cenc,
download_item.media.stream_info.audio_track.use_single_content_key,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.media.media_id,
download_item.media.decryption_key,
download_item.media.stream_info.audio_track.fairplay_key,
download_item.media.stream_info.audio_track.use_cenc,
download_item.media.stream_info.audio_track.use_single_content_key,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
+1 -1
View File
@@ -46,7 +46,7 @@ class AppleMusicUploadedVideoDownloader:
self,
download_item: DownloadItem,
) -> None:
await self.base._download_ytdlp_async(
await self.base.download_stream(
download_item.media.stream_info.video_track.stream_url,
download_item.staged_path,
)
+18 -13
View File
@@ -56,11 +56,6 @@ class AppleMusicBaseInterface:
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
@staticmethod
def parse_catalog_media_id(media_metadata: dict) -> str:
play_params = media_metadata["attributes"].get("playParams", {})
return play_params.get("catalogId", media_metadata["id"])
@staticmethod
def parse_media_id_from_url(media_metadata: dict) -> str | None:
media_url = media_metadata["attributes"].get("url")
@@ -118,6 +113,14 @@ class AppleMusicBaseInterface:
template_cover_url,
)
@staticmethod
def get_catalog_metadata_from_library(library_metadata: dict) -> dict | None:
data = library_metadata.get("relationships", {}).get("catalog", {}).get("data")
if not data:
return None
return data[0]
@classmethod
async def create(
cls,
@@ -202,8 +205,8 @@ class AppleMusicBaseInterface:
async def get_cover_bytes(self, cover_url: str) -> bytes | None:
log = logger.bind(action="get_cover_bytes", cover_url=cover_url)
async with httpx.AsyncClient() as client:
response = await client.get(cover_url)
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(cover_url, follow_redirects=True)
if response.status_code == 404:
log.debug("cover_not_found")
@@ -263,9 +266,7 @@ class AppleMusicBaseInterface:
self,
metadata: dict,
) -> str:
log = logger.bind(
action="get_cover", media_id=self.parse_catalog_media_id(metadata)
)
log = logger.bind(action="get_cover", media_id=metadata["id"])
template_url = self._get_cover_template_url(metadata)
@@ -352,7 +353,9 @@ class AppleMusicBaseInterface:
),
album_sort=asset_data.get("sort-album"),
artist=asset_data["artistName"],
artist_id=int(asset_data["artistId"]),
artist_id=(
int(asset_data["artistId"]) if asset_data.get("artistId") else None
),
artist_sort=asset_data["sort-artist"],
comment=asset_data.get("comments"),
compilation=asset_data.get("compilation"),
@@ -377,7 +380,9 @@ class AppleMusicBaseInterface:
disc_total=asset_data.get("discCount"),
gapless=asset_data.get("gapless"),
genre=asset_data.get("genre"),
genre_id=int(asset_data["genreId"]),
genre_id=(
int(asset_data["genreId"]) if asset_data.get("genreId") else None
),
lyrics=lyrics if lyrics else None,
media_type=(
MediaType.SONG
@@ -385,7 +390,7 @@ class AppleMusicBaseInterface:
else MediaType.MUSIC_VIDEO
),
rating=MediaRating(asset_data["explicit"]),
storefront=asset_data["s"],
storefront=(int(asset_data["s"]) if asset_data.get("s") else None),
title=asset_data["itemName"],
title_id=int(asset_data["itemId"]),
title_sort=asset_data["sort-name"],
+3 -3
View File
@@ -71,8 +71,8 @@ VALID_URL_PATTERN = re.compile(
r"(?:\?i=(?P<sub_id>[0-9]+))?"
r"|"
r"(?:/(?P<library_storefront>[a-z]{2}))?"
r"/library/(?P<library_type>playlist|albums)"
r"/(?P<library_id>p\.[a-zA-Z0-9]+|l\.[a-zA-Z0-9]+)"
r"/library/(?P<library_type>playlist|albums|songs|music-videos)"
r"/(?P<library_id>[pli]\.[a-zA-Z0-9]+)"
r")"
)
@@ -95,7 +95,7 @@ ARTIST_AUTO_SELECT_STR_MAP = {
"music-videos": "Music Videos",
}
SONG_CODEC_FLAVOR_MAP = {
MEDIA_CODEC_FLAVOR_MAP = {
"aac-web": "28:ctrp256",
"aac-he-web": "32:ctrp64",
"aac-fps-web": "30:cbcp256",
+5 -4
View File
@@ -6,7 +6,7 @@ from .constants import (
FOURCC_MAP,
MEDIA_RATING_STR_MAP,
MEDIA_TYPE_STR_MAP,
SONG_CODEC_FLAVOR_MAP,
MEDIA_CODEC_FLAVOR_MAP,
)
@@ -68,7 +68,7 @@ class SongCodec(Enum):
@property
def flavor(self) -> str | None:
return SONG_CODEC_FLAVOR_MAP.get(self.value)
return MEDIA_CODEC_FLAVOR_MAP.get(self.value)
@property
def is_cenc(self) -> bool:
@@ -80,8 +80,9 @@ class MusicVideoCodec(Enum):
H265 = "h265"
ASK = "ask"
def fourcc(self) -> str:
return FOURCC_MAP[self.value]
@property
def fourcc(self) -> str | None:
return FOURCC_MAP.get(self.value)
class MusicVideoResolution(Enum):
+44 -32
View File
@@ -101,19 +101,16 @@ class AppleMusicInterface:
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
is_library=is_library,
index=index,
total=total,
media_metadata=media_metadata,
playlist_metadata=playlist_metadata,
)
if index is not None:
media.index = index
if total is not None:
media.total = total
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
async for media in self.song.get_media(media):
yield media
@@ -133,16 +130,17 @@ class AppleMusicInterface:
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
is_library=is_library,
index=index,
total=total,
media_metadata=media_metadata,
playlist_metadata=playlist_metadata,
)
if index is not None:
media.index = index
if total is not None:
media.total = total
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
@@ -187,12 +185,14 @@ class AppleMusicInterface:
try:
base_media.media_metadata = (
await self.base.apple_music_api.get_library_album(
media_id,
)
if is_library
else await self.base.apple_music_api.get_album(
media_id,
await (
self.base.apple_music_api.get_library_album(
media_id,
)
if is_library
else self.base.apple_music_api.get_album(
media_id,
)
)
)["data"][0]
@@ -214,6 +214,7 @@ class AppleMusicInterface:
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
is_library=is_library,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
@@ -221,6 +222,7 @@ class AppleMusicInterface:
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
is_library=is_library,
)
)
for index, track in enumerate(tracks)
@@ -246,12 +248,14 @@ class AppleMusicInterface:
try:
base_media.media_metadata = (
await self.base.apple_music_api.get_library_playlist(
media_id,
)
if is_library
else await self.base.apple_music_api.get_playlist(
media_id,
await (
self.base.apple_music_api.get_library_playlist(
media_id,
)
if is_library
else self.base.apple_music_api.get_playlist(
media_id,
)
)
)["data"][0]
@@ -283,6 +287,7 @@ class AppleMusicInterface:
index=index,
media_metadata=track,
playlist_metadata=base_media.media_metadata,
is_library=is_library,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
@@ -290,6 +295,7 @@ class AppleMusicInterface:
index=index,
media_metadata=track,
playlist_metadata=base_media.media_metadata,
is_library=is_library,
)
)
for index, track in enumerate(tracks)
@@ -425,32 +431,38 @@ class AppleMusicInterface:
url_info.type,
)
if url_info.type == "song" or url_info.sub_id:
if (
url_info.type == "song"
or url_info.library_type == "songs"
or url_info.sub_id
):
async for media in self._get_song_media(
media_id=url_info.sub_id or url_info.id,
media_id=url_info.sub_id or url_info.id or url_info.library_id,
index=0,
total=1,
is_library=bool(url_info.library_type),
):
yield media
elif url_info.type == "music-video":
elif url_info.type == "music-video" or url_info.library_type == "music-videos":
async for media in self._get_music_video_media(
media_id=url_info.id,
media_id=url_info.id or url_info.library_id,
index=0,
total=1,
is_library=bool(url_info.library_type),
):
yield media
elif url_info.type == "album" or url_info.library_type == "albums":
async for media in self._get_album_media(
media_id=url_info.library_id or url_info.id,
media_id=url_info.id or url_info.library_id,
is_library=bool(url_info.library_type),
):
yield media
elif url_info.type == "playlist" or url_info.library_type == "playlist":
async for media in self._get_playlist_media(
media_id=url_info.library_id or url_info.id,
media_id=url_info.id or url_info.library_id,
is_library=bool(url_info.library_type),
):
yield media
+95 -39
View File
@@ -57,7 +57,12 @@ class AppleMusicMusicVideoInterface:
return itunes_page["storePlatformData"]["product-dv"]["results"][url_media_id]
def _get_m3u8_master_url_from_webplayback(self, webplayback: dict) -> str:
log = logger.bind(action="get_m3u8_master_url_from_webplayback")
m3u8_master_url = webplayback["hls-playlist-url"]
log.debug("success", m3u8_master_url=m3u8_master_url)
return m3u8_master_url
def _get_m3u8_master_url_from_itunes_page_metadata(
@@ -97,7 +102,7 @@ class AppleMusicMusicVideoInterface:
) -> MediaTags:
log = logger.bind(
action="get_music_video_tags",
media_id=self.base.parse_catalog_media_id(metadata),
media_id=metadata["id"],
)
url_media_id = self.base.parse_media_id_from_url(metadata)
@@ -122,7 +127,8 @@ class AppleMusicMusicVideoInterface:
genre_id=int(itunes_page_metadata["genres"][0]["genreId"]),
media_type=MediaType.MUSIC_VIDEO,
storefront=self.base.itunes_api.storefront_id,
title=lookup_metadata[0]["trackCensoredName"],
title=lookup_metadata[0]["trackName"],
title_sort=lookup_metadata[0]["trackCensoredName"],
title_id=int(metadata["id"]),
rating=rating,
)
@@ -134,7 +140,8 @@ class AppleMusicMusicVideoInterface:
if not album:
return tags
tags.album = lookup_metadata[1]["collectionCensoredName"]
tags.album = lookup_metadata[1]["collectionName"]
tags.album_sort = lookup_metadata[1]["collectionCensoredName"]
tags.album_artist = lookup_metadata[1]["artistName"]
tags.album_id = int(itunes_page_metadata["collectionId"])
tags.disc = lookup_metadata[0]["discNumber"]
@@ -147,39 +154,51 @@ class AppleMusicMusicVideoInterface:
return tags
async def get_stream_info(
async def get_m3u8_master_url(
self,
metadata: dict,
itunes_page_metadata: dict,
) -> StreamInfoAv | None:
log = logger.bind(
action="get_music_video_stream_info",
media_id=self.base.parse_catalog_media_id(metadata),
)
) -> str | None:
url_media_id = self.base.parse_media_id_from_url(metadata)
m3u8_master_url = None
if url_media_id == metadata["id"]:
m3u8_master_url = self._get_m3u8_master_url_from_itunes_page_metadata(
return self._get_m3u8_master_url_from_itunes_page_metadata(
itunes_page_metadata,
)
webplayback_response = await self.base.apple_music_api.get_webplayback(
metadata["id"]
)
return self._get_m3u8_master_url_from_webplayback(
webplayback_response["songList"][0],
)
async def _get_stream_info(
self,
m3u8_master_url: str | None,
codec: MusicVideoCodec,
) -> StreamInfoAv | None:
log = logger.bind(
action="get_music_video_stream_info",
m3u8_master_url=m3u8_master_url,
codec=codec.value,
)
if not m3u8_master_url:
webplayback_response = await self.base.apple_music_api.get_webplayback(
metadata["id"]
)
m3u8_master_url = self._get_m3u8_master_url_from_webplayback(
webplayback_response["songList"][0],
)
log.debug("no_m3u8_master_url")
return None
playlist_master_m3u8_obj = m3u8.loads(
(await self.base.get_response(m3u8_master_url)).text
)
playlist_master_m3u8_obj.base_uri = m3u8_master_url.rpartition("/")[0]
stream_info_video = await self._get_stream_info_video(playlist_master_m3u8_obj)
stream_info_video = await self._get_stream_info_video(
playlist_master_m3u8_obj,
codec,
)
stream_info_audio = await self._get_stream_info_audio(
playlist_master_m3u8_obj.data,
codec,
)
if not stream_info_video or not stream_info_audio:
return None
@@ -207,20 +226,20 @@ class AppleMusicMusicVideoInterface:
def _get_video_playlist_from_resolution(
self,
video_playlists: list[m3u8.Playlist],
codec: MusicVideoCodec,
) -> m3u8.Playlist | None:
playlist_results = []
for codec_index, codec in enumerate(self.codec_priority):
for playlist in video_playlists:
if playlist.stream_info.codecs.startswith(codec.fourcc()):
playlist_results.append((codec_index, playlist))
playlist_results = [
playlist
for playlist in video_playlists
if playlist.stream_info.codecs.startswith(codec.fourcc)
]
if not playlist_results:
return None
def sort_key(
item: tuple[int, m3u8.Playlist],
) -> tuple[bool, int, int, int, int]:
codec_index, playlist = item
playlist: m3u8.Playlist,
) -> tuple[bool, int, int, int]:
playlist_resolution = playlist.stream_info.resolution[-1]
bandwidth = playlist.stream_info.bandwidth
exceeds_resolution = playlist_resolution > int(self.resolution)
@@ -229,13 +248,12 @@ class AppleMusicMusicVideoInterface:
return (
exceeds_resolution,
resolution_difference,
codec_index,
-playlist_resolution,
-bandwidth,
)
playlist_results.sort(key=sort_key)
return playlist_results[0][1]
return playlist_results[0]
def _get_best_stereo_audio_playlist(
self,
@@ -314,12 +332,14 @@ class AppleMusicMusicVideoInterface:
async def _get_stream_info_video(
self,
playlist_master_m3u8_obj: m3u8.M3U8,
codec: MusicVideoCodec,
) -> StreamInfo | None:
stream_info = StreamInfo()
if MusicVideoCodec.ASK not in self.codec_priority:
if codec != MusicVideoCodec.ASK:
playlist = self._get_video_playlist_from_resolution(
playlist_master_m3u8_obj.playlists,
codec,
)
else:
playlist = await self._get_video_playlist_from_user(
@@ -345,10 +365,11 @@ class AppleMusicMusicVideoInterface:
async def _get_stream_info_audio(
self,
playlist_master_data: dict,
codec: MusicVideoCodec,
) -> StreamInfo | None:
stream_info = StreamInfo()
if MusicVideoCodec.ASK not in self.codec_priority:
if codec != MusicVideoCodec.ASK:
playlist = self._get_best_stereo_audio_playlist(playlist_master_data)
else:
playlist = await self._get_audio_playlist_from_user(playlist_master_data)
@@ -368,6 +389,27 @@ class AppleMusicMusicVideoInterface:
return stream_info
async def get_stream_info(
self,
media_id: str,
m3u8_master_url: str | None,
) -> StreamInfoAv:
stream_info = None
for codec in self.codec_priority:
stream_info = await self._get_stream_info(m3u8_master_url, codec)
if stream_info:
break
if not stream_info:
raise GamdlInterfaceFormatNotAvailableError(
media_id=media_id,
codec=[codec.value for codec in self.codec_priority],
)
return stream_info
async def get_decryption_key(
self,
stream_info: StreamInfoAv,
@@ -394,10 +436,24 @@ class AppleMusicMusicVideoInterface:
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_music_video(media.media_id)
await (
self.base.apple_music_api.get_library_music_video(media.media_id)
if media.is_library
else self.base.apple_music_api.get_music_video(media.media_id)
)
)["data"][0]
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
if media.media_metadata["attributes"].get("playParams", {}).get("isLibrary"):
catalog_metadata = self.base.get_catalog_metadata_from_library(
media.media_metadata
)
if catalog_metadata:
media.media_id = catalog_metadata["id"]
media.is_library = False
media.media_metadata = catalog_metadata
if media.is_library:
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
yield media
@@ -420,20 +476,20 @@ class AppleMusicMusicVideoInterface:
playback["songList"][0]["assets"][0]["metadata"],
)
else:
playback = None
media.tags = await self.get_tags(
media.media_metadata,
itunes_page_metadata,
)
media.stream_info = await self.get_stream_info(
m3u8_master_url = await self.get_m3u8_master_url(
media.media_metadata,
itunes_page_metadata,
)
if not media.stream_info:
raise GamdlInterfaceFormatNotAvailableError(
media.media_id,
self.codec_priority,
)
media.stream_info = await self.get_stream_info(
media.media_id,
m3u8_master_url,
)
if (
not media.stream_info.video_track.widevine_pssh
+152 -104
View File
@@ -3,7 +3,6 @@ import base64
import datetime
import json
import re
import struct
from typing import AsyncGenerator, Callable
from xml.dom import minidom
from xml.etree import ElementTree
@@ -13,7 +12,7 @@ import structlog
from .base import AppleMusicBaseInterface
from .constants import DRM_DEFAULT_KEY_MAPPING, MP4_FORMAT_CODECS, SONG_CODEC_REGEX_MAP
from .enums import MediaRating, MediaType, SongCodec, SyncedLyricsFormat
from .enums import SongCodec, SyncedLyricsFormat
from .exceptions import (
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFormatNotAvailableError,
@@ -24,11 +23,9 @@ from .types import (
DecryptionKeyAv,
Lyrics,
MediaFileFormat,
MediaTags,
StreamInfo,
StreamInfoAv,
)
import httpx
logger = structlog.get_logger(__name__)
@@ -56,9 +53,13 @@ class AppleMusicSongInterface:
) -> Lyrics | None:
log = logger.bind(
action="get_lyrics",
song_id=self.base.parse_catalog_media_id(song_metadata),
song_id=song_metadata["id"],
)
if song_metadata["attributes"]["playParams"].get("isLibrary"):
log.debug("library_song_no_lyrics")
return None
if not song_metadata["attributes"]["hasLyrics"]:
log.debug("no_lyrics")
return None
@@ -69,7 +70,7 @@ class AppleMusicSongInterface:
):
song_metadata = (
await self.base.apple_music_api.get_song(
self.base.parse_catalog_media_id(song_metadata)
song_metadata["id"],
)
)["data"][0]
@@ -190,95 +191,91 @@ class AppleMusicSongInterface:
return f"[{timestamp.strftime('%M:%S.%f')[:-4]}]{text}"
def _get_m3u8_from_playback(self, playback: dict) -> str | None:
return playback["songList"][0].get("hls-playlist-url")
async def get_tags(
self,
asset_data: dict,
lyrics: str | None = None,
) -> MediaTags:
log = logger.bind(action="get_song_tags")
tags = MediaTags(
album=asset_data["playlistName"],
album_artist=asset_data["playlistArtistName"],
album_id=int(asset_data["playlistId"]),
album_sort=asset_data["sort-album"],
artist=asset_data["artistName"],
artist_id=int(asset_data["artistId"]),
artist_sort=asset_data["sort-artist"],
comment=asset_data.get("comments"),
compilation=asset_data["compilation"],
composer=asset_data.get("composerName"),
composer_id=(
int(asset_data.get("composerId"))
if asset_data.get("composerId")
else None
),
composer_sort=asset_data.get("sort-composer"),
copyright=asset_data.get("copyright"),
date=(
await self.base.get_media_date(asset_data["playlistId"])
if self.use_album_date
else (
self.base.parse_date(asset_data["releaseDate"])
if asset_data.get("releaseDate")
else None
)
),
disc=asset_data["discNumber"],
disc_total=asset_data["discCount"],
gapless=asset_data["gapless"],
genre=asset_data.get("genre"),
genre_id=int(asset_data["genreId"]),
lyrics=lyrics if lyrics else None,
media_type=MediaType.SONG,
rating=MediaRating(asset_data["explicit"]),
storefront=asset_data["s"],
title=asset_data["itemName"],
title_id=int(asset_data["itemId"]),
title_sort=asset_data["sort-name"],
track=asset_data["trackNumber"],
track_total=asset_data["trackCount"],
xid=asset_data.get("xid"),
def _switch_m3u8_master_url_to_default(self, m3u8_master_url: str) -> str:
return re.sub(
r"(P\d+)_[^/]+(\.m3u8)",
r"\1_default\2",
m3u8_master_url,
)
log.debug("success", tags=tags)
def _get_m3u8_from_playback(self, playback: dict) -> str | None:
log = logger.bind(action="get_m3u8_master_url_from_playback")
return tags
m3u8_master_url = playback["songList"][0].get("hls-playlist-url")
if m3u8_master_url:
m3u8_master_url = self._switch_m3u8_master_url_to_default(m3u8_master_url)
log.debug("success", m3u8_master_url=m3u8_master_url)
return m3u8_master_url
log.debug("no_m3u8_master_url")
async def _get_m3u8_master_url_from_metadata(
self,
song_metadata: dict,
) -> str | None:
log = logger.bind(
action="get_m3u8_master_url_from_metadata",
song_id=song_metadata["id"],
)
if song_metadata["attributes"]["playParams"].get("isLibrary"):
log.debug("library_song_no_m3u8_master_url")
return None
async def _get_m3u8_from_metadata(self, song_metadata: dict) -> str | None:
if "extendedAssetUrls" not in song_metadata["attributes"]:
song_metadata = (
await self.base.apple_music_api.get_song(
self.base.parse_catalog_media_id(song_metadata),
song_metadata["id"],
)
)["data"][0]
return song_metadata["attributes"]["extendedAssetUrls"].get("enhancedHls")
enhanced = song_metadata["attributes"]["extendedAssetUrls"].get("enhancedHls")
if enhanced:
enhanced = self._switch_m3u8_master_url_to_default(enhanced)
log.debug("success", m3u8_master_url=enhanced)
return enhanced
log.debug("no_m3u8_master_url")
return None
async def get_m3u8_master_url(
self,
playback: dict | None,
song_metadata: dict | None,
) -> str | None:
if playback:
return self._get_m3u8_from_playback(playback)
else:
return await self._get_m3u8_master_url_from_metadata(song_metadata)
async def get_stream_info(
self,
media_id: str,
is_library: bool,
m3u8_master_url: str | None = None,
webplayback: dict | None = None,
) -> StreamInfoAv:
stream_info = None
for codec in self.codec_priority:
if codec.is_web:
stream_info = await self._get_web_stream_info(webplayback, codec)
else:
stream_info = await self._get_stream_info(m3u8_master_url, codec)
if is_library:
stream_info = await self._get_library_stream_info(webplayback)
else:
for codec in self.codec_priority:
if codec.is_web:
stream_info = await self._get_web_stream_info(webplayback, codec)
else:
stream_info = await self._get_stream_info(m3u8_master_url, codec)
if stream_info:
break
if stream_info:
break
if not stream_info:
raise GamdlInterfaceFormatNotAvailableError(
media_id=media_id,
formats=[codec.value for codec in self.codec_priority],
codec=[codec.value for codec in self.codec_priority],
)
return stream_info
@@ -443,11 +440,15 @@ class AppleMusicSongInterface:
async def _get_web_stream_info(
self,
webplayback: dict,
webplayback: dict | None,
codec: SongCodec,
) -> StreamInfoAv:
log = logger.bind(action="get_web_song_stream_info")
if not webplayback:
log.debug("no_webplayback")
return None
flavor = codec.flavor
stream_info = StreamInfo(
@@ -481,16 +482,55 @@ class AppleMusicSongInterface:
return stream_info_av
async def _get_library_stream_info(
self,
webplayback: dict | None,
) -> StreamInfoAv | None:
log = logger.bind(action="get_library_song_stream_info")
if not webplayback:
log.debug("no_webplayback")
return None
stream_info = StreamInfo(drm_free=True)
if len(webplayback["songList"][0]["assets"]) == 0:
log.debug("no_matching_asset")
return None
asset = webplayback["songList"][0]["assets"][0]
stream_info.stream_url = asset["URL"]
stream_info_av = StreamInfoAv(
media_id=webplayback["songList"][0]["songId"],
audio_track=stream_info,
file_format=MediaFileFormat.M4A,
)
log.debug("success", stream_info=stream_info_av)
return stream_info_av
async def get_media(
self,
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_song(media.media_id)
await (
self.base.apple_music_api.get_library_song(media.media_id)
if media.is_library
else self.base.apple_music_api.get_song(media.media_id)
)
)["data"][0]
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
if media.media_metadata["attributes"].get("playParams", {}).get("isLibrary"):
catalog_metadata = self.base.get_catalog_metadata_from_library(
media.media_metadata
)
if catalog_metadata:
media.media_id = catalog_metadata["id"]
media.is_library = False
media.media_metadata = catalog_metadata
yield media
@@ -510,45 +550,54 @@ class AppleMusicSongInterface:
media.lyrics = await self.get_lyrics(media.media_metadata)
if self.base.wrapper_api:
playback = await self.base.wrapper_api.get_playback(media.media_id)
playback = (
await self.base.wrapper_api.get_playback(media.media_id)
if not media.is_library
else None
)
webplayback = (
await self.base.apple_music_api.get_webplayback(
media.media_id,
media.is_library,
)
if media.is_library
or any(codec.is_web for codec in self.codec_priority)
else None
)
else:
playback = None
webplayback = await self.base.apple_music_api.get_webplayback(
media.media_id,
media.is_library,
)
if playback:
media.tags = await self.base.get_tags_from_asset_info(
playback["songList"][0]["assets"][0]["metadata"],
media.lyrics.unsynced if media.lyrics else None,
self.use_album_date,
)
if not self.skip_stream_info:
m3u8_master_url = self._get_m3u8_from_playback(playback)
webplayback = (
await self.base.apple_music_api.get_webplayback(media.media_id)
if any(codec.is_web for codec in self.codec_priority)
else None
)
media.stream_info = await self.get_stream_info(
media.media_id,
m3u8_master_url,
webplayback,
)
else:
webplayback = await self.base.apple_music_api.get_webplayback(
media.media_id
)
media.tags = await self.base.get_tags_from_asset_info(
webplayback["songList"][0]["assets"][0]["metadata"],
media.lyrics.unsynced if media.lyrics else None,
self.use_album_date,
)
if not self.skip_stream_info:
m3u8_master_url = await self._get_m3u8_from_metadata(
media.media_metadata
)
media.stream_info = await self.get_stream_info(
media.media_id,
m3u8_master_url,
webplayback,
)
if media.stream_info:
if (
if not self.skip_stream_info:
m3u8_master_url = await self.get_m3u8_master_url(
playback,
media.media_metadata,
)
media.stream_info = await self.get_stream_info(
media.media_id,
media.is_library,
m3u8_master_url,
webplayback,
)
if media.stream_info.audio_track.drm_free:
pass
elif (
not self.base.wrapper_api
and not media.stream_info.audio_track.widevine_pssh
) or (
@@ -557,7 +606,6 @@ class AppleMusicSongInterface:
and not media.stream_info.audio_track.use_cenc
):
raise GamdlInterfaceDecryptionNotAvailableError(media_id=media.media_id)
elif media.stream_info.audio_track.widevine_pssh:
media.decryption_key = DecryptionKeyAv(
audio_track=await self.base.get_decryption_key(
+2
View File
@@ -122,6 +122,7 @@ class StreamInfo:
codec: str = None
width: int = None
height: int = None
drm_free: bool = False
use_cenc: bool = False
use_single_content_key: bool = True
@@ -156,6 +157,7 @@ class Cover:
@dataclass
class AppleMusicMedia:
media_id: str
is_library: bool = False
index: int = 0
total: int = 0
partial: bool = True
+2 -1
View File
@@ -79,6 +79,7 @@ class AppleMusicUploadedVideoInterface:
file_format=MediaFileFormat.M4V,
video_track=StreamInfo(
stream_url=stream_url,
drm_free=True,
),
)
@@ -113,7 +114,7 @@ class AppleMusicUploadedVideoInterface:
await self.base.apple_music_api.get_uploaded_video(media.media_id)
)["data"][0]
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
media.media_id = media["id"]
yield media
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "gamdl"
version = "3.6"
version = "3.7.4"
description = "A command-line app for downloading Apple Music songs, music videos and post videos."
readme = "README.md"
license = "MIT"
Generated
+1 -1
View File
@@ -223,7 +223,7 @@ wheels = [
[[package]]
name = "gamdl"
version = "3.6"
version = "3.7.4"
source = { virtual = "." }
dependencies = [
{ name = "async-lru" },