Compare commits

...

27 Commits

Author SHA1 Message Date
Rafael Moraes 9375c2fccd Bump version to 3.3 2026-04-24 19:48:58 -03:00
Rafael Moraes c83e47df0c Remove total arg from media fetch calls 2026-04-24 19:48:27 -03:00
Rafael Moraes 715820e357 Bump version to 3.2 2026-04-24 16:17:49 -03:00
Rafael Moraes 137a739af2 Collect async generators for concurrency 2026-04-24 16:05:37 -03:00
Rafael Moraes 23220d1827 Limit download logging and use interface exception 2026-04-24 15:48:14 -03:00
Rafael Moraes 3c7ea272af Skip partial media; Remove flat filter exception 2026-04-24 15:44:40 -03:00
Rafael Moraes 34a92b6efc Refactor interface media fetching 2026-04-24 15:44:19 -03:00
Rafael Moraes 3a907cb76c Remove skip_decryption_key_non_legacy arg 2026-04-24 13:02:22 -03:00
Rafael Moraes 90646e7193 Use base.use_wrapper for decryption checks 2026-04-24 13:02:07 -03:00
Rafael Moraes 3b2875ccd1 Remove use_wrapper parameter and attribute 2026-04-24 12:59:01 -03:00
Rafael Moraes a989d9fefa Include index and total for music-video media fetch 2026-04-24 12:17:19 -03:00
Rafael Moraes fd3b6216c9 Use error() for URL parse errors 2026-04-24 12:08:24 -03:00
Rafael Moraes 84c21c0013 Pass total=1 when fetching single Apple Music song 2026-04-24 12:06:49 -03:00
Rafael Moraes aca3339b16 Remove string fallback for media_index 2026-04-24 12:04:52 -03:00
Rafael Moraes 6d6f9f4441 Provide index=0 to _get_song_media call 2026-04-24 12:01:51 -03:00
Rafael Moraes fe98bdb42c Process download items inline, remove queue 2026-04-24 11:55:35 -03:00
Rafael Moraes 7c8b20d8f3 Include track index/total in media objects 2026-04-24 11:55:11 -03:00
Rafael Moraes 6232493eed Add index and total fields to AppleMusicMedia 2026-04-24 11:54:57 -03:00
Rafael Moraes 09997bd6a1 Document --wrapper-m3u8-ip CLI option 2026-04-24 11:36:32 -03:00
Rafael Moraes 54c318908c Bump version to 3.1 2026-04-24 11:33:59 -03:00
Rafael Moraes dc6f2e8506 Use ExceptionPrettyPrinter and .exception logging 2026-04-24 11:26:21 -03:00
Rafael Moraes eff41a40f5 Await get_wrapper_m3u8 call 2026-04-24 11:22:33 -03:00
Rafael Moraes b00163a71c Add optional m3u8 wrapper support 2026-04-24 11:18:01 -03:00
Rafael Moraes 9f60043375 Add wrapper m3u8 IP and consolidate use_wrapper 2026-04-24 11:17:34 -03:00
Rafael Moraes 004ecd7c64 Guard against missing response on HTTP errors 2026-04-24 11:17:04 -03:00
Rafael Moraes 581bb7e094 Make GamdlApiResponseError.content optional 2026-04-24 11:15:57 -03:00
Rafael Moraes 5fd10d897e Extract cover URL formatting to helper 2026-04-23 11:45:57 -03:00
20 changed files with 460 additions and 520 deletions
+1
View File
@@ -142,6 +142,7 @@ The file is created automatically on first run. Command-line arguments override
| `--cover-format` | Cover format | `jpg` |
| `--cover-size` | Cover size in pixels | `1200` |
| `--wvd-path` | .wvd file path | - |
| `--wrapper-m3u8-ip` | Wrapper m3u8 IP address and port | - |
| **Song Options** | | |
| `--synced-lyrics-format` | Synced lyrics format | `lrc` |
| `--song-codec-priority` | Comma-separated codec priority | `aac-legacy` |
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.0"
__version__ = "3.3"
+17 -10
View File
@@ -70,6 +70,7 @@ class AppleMusicApi:
async def get_token() -> str:
log = logger.bind(action="get_token")
response = None
async with httpx.AsyncClient() as client:
try:
response = await client.get(
@@ -81,7 +82,7 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching Apple Music homepage",
status_code=response.status_code,
status_code=response.status_code if response is not None else None,
)
index_js_uri_match = re.search(
@@ -94,6 +95,7 @@ class AppleMusicApi:
)
index_js_uri = index_js_uri_match.group(1)
response = None
async with httpx.AsyncClient(follow_redirects=True) as client:
try:
response = await client.get(
@@ -104,7 +106,7 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching index.js page",
status_code=response.status_code,
status_code=response.status_code if response is not None else None,
)
token_match = re.search('(?=eyJh)(.*?)(?=")', index_js_page)
@@ -124,6 +126,7 @@ class AppleMusicApi:
) -> dict:
log = logger.bind(action="get_account_info", meta=meta)
response = None
async with httpx.AsyncClient() as client:
try:
response = await client.get(
@@ -142,7 +145,7 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching account info",
status_code=response.status_code,
status_code=response.status_code if response is not None else None,
)
log.debug("success", account_info=account_info)
@@ -243,6 +246,7 @@ class AppleMusicApi:
*args,
**kwargs,
) -> "AppleMusicApi":
response = None
async with httpx.AsyncClient() as client:
try:
response = await client.get(wrapper_account_url)
@@ -251,7 +255,7 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching wrapper account info",
status_code=response.status_code,
status_code=response.status_code if response is not None else None,
)
return await cls.create(
@@ -266,6 +270,7 @@ class AppleMusicApi:
uri: str,
params: dict | None = None,
) -> dict:
response = None
try:
response = await self.client.get(
APPLE_MUSIC_AMP_API_URL + uri,
@@ -276,8 +281,8 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching from AMP API",
content=response.text,
status_code=response.status_code,
content=response.text if response is not None else None,
status_code=response.status_code if response is not None else None,
)
if "errors" in response_json:
@@ -533,6 +538,7 @@ class AppleMusicApi:
) -> dict:
log = logger.bind(action="get_webplayback", track_id=track_id)
response = None
try:
response = await self.client.post(
APPLE_MUSIC_WEBPLAYBACK_API_URL,
@@ -546,8 +552,8 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching webplayback data",
content=response.text,
status_code=response.status_code,
content=response.text if response is not None else None,
status_code=response.status_code if response is not None else None,
)
if "dialog" in webplayback:
@@ -570,6 +576,7 @@ class AppleMusicApi:
) -> dict:
log = logger.bind(action="get_license_exchange", track_id=track_id)
response = None
try:
response = await self.client.post(
APPLE_MUSIC_LICENSE_API_URL,
@@ -587,8 +594,8 @@ class AppleMusicApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching license exchange data",
content=response.text,
status_code=response.status_code,
content=response.text if response is not None else None,
status_code=response.status_code if response is not None else None,
)
if license_exchange.get("status") != 0:
+1 -1
View File
@@ -9,7 +9,7 @@ class GamdlApiResponseError(GamdlApiError):
def __init__(
self,
message: str,
content: str,
content: str | None = None,
status_code: int | None = None,
):
self.message = message
+8 -5
View File
@@ -30,6 +30,7 @@ class ItunesApi:
async def get_storefront_id(storefront: str) -> int:
log = logger.bind(action="get_storefront_id", storefront=storefront)
response = None
async with httpx.AsyncClient() as client:
try:
response = await client.get(APPLE_MUSIC_MUSIC_KIT_URL)
@@ -38,7 +39,7 @@ class ItunesApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching MusicKit content",
status_code=response.status_code,
status_code=response.status_code if response is not None else None,
)
normalized_storefront = storefront.upper()
@@ -92,6 +93,7 @@ class ItunesApi:
) -> dict:
log = logger.bind(action="get_lookup_result", media_id=media_id, entity=entity)
response = None
try:
response = await self.client.get(
ITUNES_LOOKUP_API_URL,
@@ -107,8 +109,8 @@ class ItunesApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching iTunes lookup result",
content=response.text,
status_code=response.status_code,
content=response.text if response is not None else None,
status_code=response.status_code if response is not None else None,
)
log.debug("success", lookup_result=lookup_result)
@@ -126,6 +128,7 @@ class ItunesApi:
media_id=media_id,
)
response = None
try:
response = await self.client.get(
ITUNES_PAGE_API_URL.format(media_type=media_type, media_id=media_id),
@@ -138,8 +141,8 @@ class ItunesApi:
except httpx.HTTPError:
raise GamdlApiResponseError(
"Error fetching iTunes page",
content=response.text,
status_code=response.status_code,
content=response.text if response is not None else None,
status_code=response.status_code if response is not None else None,
)
log.debug("success", itunes_page=itunes_page)
+68 -62
View File
@@ -1,6 +1,5 @@
import asyncio
import logging
import traceback
from functools import wraps
from pathlib import Path
@@ -18,9 +17,7 @@ from ..downloader import (
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
DownloadItem,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
@@ -32,6 +29,7 @@ from ..interface import (
AppleMusicUploadedVideoInterface,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFlatFilterExcludedError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceUrlParseError,
@@ -78,6 +76,7 @@ async def main(config: CliConfig):
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.ExceptionPrettyPrinter(),
custom_structlog_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
@@ -142,6 +141,8 @@ async def main(config: CliConfig):
apple_music_api=apple_music_api,
cover_format=config.cover_format,
cover_size=config.cover_size,
use_wrapper=config.use_wrapper,
wrapper_m3u8_ip=config.wrapper_m3u8_ip,
wvd_path=config.wvd_path,
)
@@ -150,7 +151,6 @@ async def main(config: CliConfig):
synced_lyrics_format=config.synced_lyrics_format,
codec_priority=config.song_codec_piority,
use_album_date=config.use_album_date,
skip_decryption_key_non_legacy=config.use_wrapper,
skip_stream_info=config.synced_lyrics_only,
ask_codec_function=interactive_prompts.ask_song_codec,
)
@@ -184,7 +184,6 @@ async def main(config: CliConfig):
mp4decrypt_path=config.mp4decrypt_path,
ffmpeg_path=config.ffmpeg_path,
mp4box_path=config.mp4box_path,
use_wrapper=config.use_wrapper,
wrapper_decrypt_ip=config.wrapper_decrypt_ip,
download_mode=config.download_mode,
album_folder_template=config.album_folder_template,
@@ -245,64 +244,71 @@ async def main(config: CliConfig):
url_log.info(f'Processing "{url}"')
try:
download_queue: list[DownloadItem] = []
async for media in downloader.get_download_item_from_url(url):
download_queue.append(media)
except GamdlInterfaceUrlParseError as e:
url_log.warning(f"{e}")
continue
except Exception as e:
url_log.error(f'Error processing "{url}": {e}')
error_count += 1
if not config.no_exceptions:
traceback.print_exc()
continue
async for download_item in downloader.get_download_item_from_url(url):
media_index = download_item.media.index + 1
media_total = download_item.media.total or "-"
for download_index, download_item in enumerate(
download_queue,
1,
):
track_log = logger.bind(
action=f"Track {download_index:>3}/{len(download_queue):<3}"
)
media_title = (
download_item.media.media_metadata["attributes"]["name"]
if download_item.media.media_metadata
and download_item.media.media_metadata.get("attributes", {}).get("name")
else "Unknown Title"
)
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
except (
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceArtistMediaTypeError,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
except Exception as e:
error_count += 1
track_log.error(f'Error downloading "{media_title}"')
if not config.no_exceptions:
traceback.print_exc()
if (
database
and download_item.media.media_metadata
and download_item.final_path
):
database.add(
download_item.media.media_metadata["id"],
download_item.final_path,
track_log = logger.bind(
action=f"Track {media_index:>3}/{media_total:<3}"
)
media_title = (
download_item.media.media_metadata["attributes"]["name"]
if download_item.media.media_metadata
and download_item.media.media_metadata.get("attributes", {}).get(
"name"
)
else "Unknown Title"
)
media_type = (
download_item.media.media_metadata["type"]
if download_item.media.media_metadata
else None
)
if download_item.media.partial and media_type in {
None,
"songs",
"library-songs",
"music-videos",
"library-music-videos",
"uploaded-videos",
}:
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
except (
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceArtistMediaTypeError,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
GamdlInterfaceFlatFilterExcludedError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
except Exception as e:
error_count += 1
track_log.exception(f'Error downloading "{media_title}"')
if (
database
and download_item.media.media_metadata
and download_item.final_path
):
database.add(
download_item.media.media_metadata["id"],
download_item.final_path,
)
except GamdlInterfaceUrlParseError as e:
url_log.error(f"{e}")
continue
except Exception as e:
url_log.exception(f'Error processing "{url}": {e}')
error_count += 1
continue
logger.info(f"Finished with {error_count} error(s)")
+16 -8
View File
@@ -210,6 +210,22 @@ class CliConfig:
),
),
]
use_wrapper: Annotated[
bool,
option(
"--use-wrapper",
help="Use wrapper for decrypting songs",
is_flag=True,
),
]
wrapper_m3u8_ip: Annotated[
str,
option(
"--wrapper-m3u8-ip",
help="Wrapper m3u8 IP address and port",
default=base_interface_create_sig.parameters["wrapper_m3u8_ip"].default,
),
]
# Song Interface Options
synced_lyrics_format: Annotated[
SyncedLyricsFormat,
@@ -328,14 +344,6 @@ class CliConfig:
default=base_downloader_sig.parameters["mp4box_path"].default,
),
]
use_wrapper: Annotated[
bool,
option(
"--use-wrapper",
help="Use wrapper for decrypting songs",
is_flag=True,
),
]
wrapper_decrypt_ip: Annotated[
str,
option(
-2
View File
@@ -27,7 +27,6 @@ class AppleMusicBaseDownloader:
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
use_wrapper: bool = False,
wrapper_decrypt_ip: str = "127.0.0.1:10020",
download_mode: DownloadMode = DownloadMode.YTDLP,
album_folder_template: str = "{album_artist}/{album}",
@@ -50,7 +49,6 @@ class AppleMusicBaseDownloader:
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.use_wrapper = use_wrapper
self.wrapper_decrypt_ip = wrapper_decrypt_ip
self.download_mode = download_mode
self.album_folder_template = album_folder_template
+6 -7
View File
@@ -5,12 +5,10 @@ from typing import AsyncGenerator
import structlog
from ..interface.types import AppleMusicMedia
from .constants import TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .exceptions import (
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
@@ -60,7 +58,10 @@ class AppleMusicDownloader:
self,
media: AppleMusicMedia,
) -> DownloadItem:
if media.error or media.flat_filter_result:
if media.error:
return DownloadItem(media)
if media.partial:
return DownloadItem(media)
elif media.media_metadata["type"] in {"songs", "library-songs"}:
@@ -80,10 +81,8 @@ class AppleMusicDownloader:
if item.media.error:
raise item.media.error
if item.media.flat_filter_result:
raise GamdlDownloaderFlatFilterExcludedError(
item.media.media_metadata["id"]
)
if item.media.partial:
return
await self._initial_processing(item)
await self._download(item)
-5
View File
@@ -18,8 +18,3 @@ class GamdlDownloaderMediaFileExistsError(GamdlDownloaderError):
class GamdlDownloaderDependencyNotFoundError(GamdlDownloaderError):
def __init__(self, dependency_name: str) -> None:
super().__init__(f"Required dependency not found: {dependency_name}")
class GamdlDownloaderFlatFilterExcludedError(GamdlDownloaderError):
def __init__(self, media_id: str) -> None:
super().__init__(f"Media is excluded by flat filter: {media_id}")
+1 -1
View File
@@ -96,7 +96,7 @@ class AppleMusicSongDownloader:
staged_path=staged_path,
)
if self.base.use_wrapper and not legacy:
if self.base.interface.base.use_wrapper and not legacy:
await self._decrypt_amdecrypt(
encrypted_path,
staged_path,
+24 -4
View File
@@ -29,12 +29,16 @@ class AppleMusicBaseInterface:
itunes_api: ItunesApi,
cover_format: CoverFormat,
cover_size: int,
use_wrapper: bool,
wrapper_m3u8_ip: str,
cdm: Cdm,
) -> None:
self.apple_music_api = apple_music_api
self.itunes_api = itunes_api
self.cover_format = cover_format
self.cover_size = cover_size
self.use_wrapper = use_wrapper
self.wrapper_m3u8_ip = wrapper_m3u8_ip
self.cdm = cdm
@staticmethod
@@ -103,14 +107,28 @@ class AppleMusicBaseInterface:
return response
@staticmethod
def format_cover(
template_cover_url: str,
cover_size: int,
cover_format: CoverFormat,
) -> str:
return re.sub(
r"/\{w\}x\{h\}([a-z]{2})\.jpg",
f"/{cover_size}x{cover_size}bb.{cover_format.value}",
template_cover_url,
)
@classmethod
async def create(
cls,
apple_music_api: AppleMusicApi,
cover_format: CoverFormat = CoverFormat.JPG,
cover_size: int = 1200,
itunes_api: ItunesApi | None = None,
use_wrapper: bool = False,
wrapper_m3u8_ip: str = "127.0.0.1:20020",
wvd_path: str | None = None,
itunes_api: ItunesApi | None = None,
):
itunes_api = itunes_api or await ItunesApi.create(
storefront=apple_music_api.storefront,
@@ -123,6 +141,8 @@ class AppleMusicBaseInterface:
itunes_api=itunes_api,
cover_format=cover_format,
cover_size=cover_size,
use_wrapper=use_wrapper,
wrapper_m3u8_ip=wrapper_m3u8_ip,
cdm=cdm,
)
return base
@@ -246,10 +266,10 @@ class AppleMusicBaseInterface:
if self.cover_format == CoverFormat.RAW:
cover_url = template_url
else:
cover_url = re.sub(
r"/\{w\}x\{h\}([a-z]{2})\.jpg",
f"/{self.cover_size}x{self.cover_size}bb.{self.cover_format.value}",
cover_url = self.format_cover(
template_url,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self._get_cover_file_extension(cover_url)
+11 -2
View File
@@ -38,5 +38,14 @@ class GamdlInterfaceUrlParseError(GamdlInterfaceError):
class GamdlInterfaceArtistMediaTypeError(GamdlInterfaceError):
def __init__(self, media_type: str):
super().__init__(f"Artist has no media of type: {media_type}")
def __init__(self, media_id: str, media_type: str):
super().__init__(
f"Artist has no media of type (media ID: {media_id}): {media_type}"
)
class GamdlInterfaceFlatFilterExcludedError(GamdlInterfaceError):
def __init__(self, media_id: str, result: Any):
super().__init__(f"Media excluded by flat filter: {media_id}")
self.result = result
+212 -352
View File
@@ -10,6 +10,7 @@ from .exceptions import (
GamdlInterfaceMediaNotAllowedError,
GamdlInterfaceUrlParseError,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceFlatFilterExcludedError,
)
from .music_video import AppleMusicMusicVideoInterface
from .song import AppleMusicSongInterface
@@ -32,7 +33,7 @@ class AppleMusicInterface:
Callable[[ArtistMediaType, list[dict]], list[dict] | None] | None
) = None,
flat_filter_function: Callable[[dict], Any] | None = None,
concurrency: int = 5,
concurrency: int = 1,
disallowed_media_types: list[str] | None = None,
) -> None:
self.song = song
@@ -64,172 +65,128 @@ class AppleMusicInterface:
return url_match
async def _run_flat_filter(self, media: AppleMusicMedia) -> None:
if not self.flat_filter_function or not media.partial:
return
result = self.flat_filter_function(media.media_metadata)
if asyncio.iscoroutine(result):
result = await result
if result:
raise GamdlInterfaceFlatFilterExcludedError(media.media_id, result)
def _run_media_type_filter(self, media: AppleMusicMedia) -> None:
if not self.disallowed_media_types or not media.partial:
return
if media.media_metadata["type"] in self.disallowed_media_types:
raise GamdlInterfaceMediaNotAllowedError(
media.media_metadata["type"],
media.media_id,
)
async def _collect_generator(
self, generator_or_coroutine: AsyncGenerator[AppleMusicMedia, None]
) -> list[AppleMusicMedia]:
results = []
async for result in generator_or_coroutine:
results.append(result)
return results
async def _get_song_media(
self,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_song(
media_id,
)
)[
"data"
][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.base.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
return await self.song.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
async for media in self.song.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_music_video_media(
self,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_music_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.music_video.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
return await self.music_video.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
async for media in self.music_video.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_uploaded_video_media(
self,
media_id: str,
) -> AppleMusicMedia:
try:
media_metadata = (
await self.base.apple_music_api.get_uploaded_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media = AppleMusicMedia(media_id, media_metadata)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
try:
return await self.uploaded_video.get_media(media_metadata)
async for media in self.music_video.get_media(media):
yield
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_album_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_album(
media_id,
)
@@ -238,76 +195,57 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media.partial = False
base_media.error = e
yield base_media
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
yield base_media
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
tasks = [
(
self._get_song_media(
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
)
for track in tracks
for index, track in enumerate(tracks)
]
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
async for media in task:
yield media
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
async def _get_playlist_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_playlist(
media_id,
)
@@ -316,75 +254,42 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
next_uri = media_metadata["relationships"]["tracks"].get("next")
href_uri = media_metadata["relationships"]["tracks"].get("href")
while next_uri:
try:
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
next_uri = base_media.media_metadata["relationships"]["tracks"].get("next")
href_uri = base_media.media_metadata["relationships"]["tracks"].get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
except Exception as e:
base_media.partial = False
base_media.error = e
yield base_media
return
yield base_media
tasks = [
(
self._get_song_media(
media_id=track["id"],
index=index,
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
media_id=track["id"],
index=index,
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
)
for index, track in enumerate(tracks)
@@ -392,25 +297,62 @@ class AppleMusicInterface:
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
async for media in task:
yield media
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
async def _get_artist_media(
self,
media_id: str,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_artist(
media_id,
)
)[
"data"
][0]
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
base_media.media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = base_media.media_metadata.get(relation_key, {}).get(
type_key, {}
)
items = items_relation.get("data", [])
if not items:
raise GamdlInterfaceArtistMediaTypeError(
base_media.media_id,
str(artist_media_type),
)
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
@@ -419,73 +361,7 @@ class AppleMusicInterface:
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = media_metadata.get(relation_key, {}).get(type_key, {})
items = items_relation.get("data", [])
if not items:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceArtistMediaTypeError(str(artist_media_type)),
)
return
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
try:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
yield base_media
if self.artist_select_items_function:
selected_items = self.artist_select_items_function(
@@ -498,60 +374,40 @@ class AppleMusicInterface:
selected_items = items[:1]
tasks = []
for item in selected_items:
for index, item in enumerate(selected_items):
if item["type"] in {"songs", "library-songs"}:
tasks.append(
(
item["type"],
self._get_song_media(
media_id=item["id"],
media_metadata=item,
),
self._get_song_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
elif item["type"] in {"albums", "library-albums"}:
tasks.append(
(
item["type"],
self._get_album_media(
media_id=item["id"],
),
self._get_album_media(
media_id=item["id"],
)
)
else:
tasks.append(
(
item["type"],
self._get_music_video_media(
media_id=item["id"],
media_metadata=item,
),
self._get_music_video_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
if self.concurrency == 1:
for item_type, task in tasks:
if item_type in {"albums", "library-albums"}:
async for result in task:
yield result
else:
yield await task
for task in tasks:
async for media in task:
yield media
else:
async def _collect_generator(generator_or_coroutine, item_type):
if item_type in {"albums", "library-albums"}:
results = []
async for result in generator_or_coroutine:
results.append(result)
return results
else:
return [await generator_or_coroutine]
collected_tasks = [
_collect_generator(task, item_type) for item_type, task in tasks
]
for batch in await safe_gather(*collected_tasks, limit=self.concurrency):
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
@@ -570,16 +426,20 @@ class AppleMusicInterface:
)
if url_info.type == "song" or url_info.sub_id:
media = await self._get_song_media(
async for media in self._get_song_media(
media_id=url_info.sub_id or url_info.id,
)
yield media
index=0,
total=1,
):
yield media
elif url_info.type == "music-video":
media = await self._get_music_video_media(
async for media in self._get_music_video_media(
media_id=url_info.id,
)
yield media
index=0,
total=1,
):
yield media
elif url_info.type == "album" or url_info.library_type == "albums":
async for media in self._get_album_media(
@@ -596,10 +456,10 @@ class AppleMusicInterface:
yield media
elif url_info.type == "post":
media = await self._get_uploaded_video_media(
async for media in self._get_uploaded_video_media(
media_id=url_info.id,
)
yield media
):
yield media
elif url_info.type == "artist":
async for media in self._get_artist_media(
+22 -19
View File
@@ -1,6 +1,6 @@
import asyncio
import urllib.parse
from typing import Callable
from typing import AsyncGenerator, Callable
import m3u8
import structlog
@@ -378,35 +378,36 @@ class AppleMusicMusicVideoInterface:
async def get_media(
self,
music_video_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: dict | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(music_video_metadata),
media_metadata=music_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_music_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(music_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(music_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(music_video_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(media.media_metadata)
media.tags = await self.get_tags(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
media.stream_info = await self.get_stream_info(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
if not media.stream_info:
@@ -423,4 +424,6 @@ class AppleMusicMusicVideoInterface:
media.decryption_key = await self.get_decryption_key(media.stream_info)
return media
media.partial = False
yield media
+48 -26
View File
@@ -3,7 +3,8 @@ import base64
import datetime
import json
import re
from typing import Callable
import struct
from typing import AsyncGenerator, Callable
from xml.dom import minidom
from xml.etree import ElementTree
@@ -38,7 +39,6 @@ class AppleMusicSongInterface:
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
codec_priority: list[SongCodec] = [SongCodec.AAC_LEGACY],
use_album_date: bool = False,
skip_decryption_key_non_legacy: bool = False,
skip_stream_info: bool = False,
ask_codec_function: Callable[[list[dict]], dict | None] | None = None,
):
@@ -46,7 +46,6 @@ class AppleMusicSongInterface:
self.synced_lyrics_format = synced_lyrics_format
self.codec_priority = codec_priority
self.use_album_date = use_album_date
self.skip_decryption_key_non_legacy = skip_decryption_key_non_legacy
self.skip_stream_info = skip_stream_info
self.ask_codec_function = ask_codec_function
@@ -258,6 +257,25 @@ class AppleMusicSongInterface:
else:
return await self._get_stream_info(song_metadata, codec)
async def get_wrapper_m3u8(self, adam_id: str) -> str | None:
host, port = self.base.wrapper_m3u8_ip.split(":")
reader, writer = await asyncio.open_connection(host, port)
data = struct.pack("B", len(adam_id)) + adam_id.encode()
writer.write(data)
await writer.drain()
response = await reader.readuntil(b"\n")
m3u8_url = response.decode().strip()
writer.close()
await writer.wait_closed()
if m3u8_url:
return m3u8_url
return None
async def _get_stream_info(
self,
song_metadata: dict,
@@ -272,8 +290,10 @@ class AppleMusicSongInterface:
)
)["data"][0]
m3u8_master_url = song_metadata["attributes"]["extendedAssetUrls"].get(
"enhancedHls"
m3u8_master_url = (
await self.get_wrapper_m3u8(self.base.parse_catalog_media_id(song_metadata))
if self.base.use_wrapper
else song_metadata["attributes"]["extendedAssetUrls"].get("enhancedHls")
)
if not m3u8_master_url:
return None
@@ -455,30 +475,31 @@ class AppleMusicSongInterface:
async def get_media(
self,
song_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(song_metadata),
media_metadata=song_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_song(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(song_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(
media_id=media.media_id,
)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(song_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.lyrics = await self.get_lyrics(song_metadata)
media.lyrics = await self.get_lyrics(media.media_metadata)
webplayback = await self.base.apple_music_api.get_webplayback(media.media_id)
@@ -489,7 +510,7 @@ class AppleMusicSongInterface:
if not self.skip_stream_info:
media.stream_info = await self.get_stream_info(
song_metadata,
media.media_metadata,
webplayback,
)
if not media.stream_info:
@@ -499,17 +520,16 @@ class AppleMusicSongInterface:
)
if (
not self.skip_decryption_key_non_legacy
not self.base.use_wrapper
and not media.stream_info.audio_track.widevine_pssh
) or (
self.skip_decryption_key_non_legacy
and not media.stream_info.audio_track.fairplay_key
self.base.use_wrapper and not media.stream_info.audio_track.fairplay_key
):
raise GamdlInterfaceDecryptionNotAvailableError(media_id=media.media_id)
if (
media.stream_info.audio_track.widevine_pssh
and not self.skip_decryption_key_non_legacy
and not self.base.use_wrapper
) or media.stream_info.audio_track.legacy:
media.decryption_key = DecryptionKeyAv(
audio_track=await self.base.get_decryption_key(
@@ -518,4 +538,6 @@ class AppleMusicSongInterface:
)
)
return media
media.partial = False
yield media
+4 -2
View File
@@ -155,7 +155,10 @@ class Cover:
@dataclass
class AppleMusicMedia:
media_id: str
media_metadata: dict
index: int = 0
total: int = 0
partial: bool = True
media_metadata: dict | None = None
error: BaseException | None = None
playlist_metadata: dict | None = None
playlist_tags: PlaylistTags | None = None
@@ -165,7 +168,6 @@ class AppleMusicMedia:
tags: MediaTags | None = None
stream_info: StreamInfoAv | None = None
decryption_key: DecryptionKeyAv | None = None
flat_filter_result: Any = None
@dataclass
+18 -11
View File
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import Callable
from typing import AsyncGenerator
import structlog
@@ -105,22 +106,28 @@ class AppleMusicUploadedVideoInterface:
async def get_media(
self,
uploaded_video_metadata: dict,
) -> AppleMusicMedia:
media = AppleMusicMedia(
uploaded_video_metadata["id"],
uploaded_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_uploaded_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(uploaded_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
media.cover = await self.base.get_cover(uploaded_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.stream_info = await self.get_stream_info(uploaded_video_metadata)
media.stream_info = await self.get_stream_info(media.media_metadata)
if not media.stream_info:
raise GamdlInterfaceFormatNotAvailableError(media.media_id)
media.tags = self.get_tags(uploaded_video_metadata)
media.tags = self.get_tags(media.media_metadata)
return media
media.partial = False
yield media
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "gamdl"
version = "3.0"
version = "3.3"
description = "A command-line app for downloading Apple Music songs, music videos and post videos."
readme = "README.md"
license = "MIT"
Generated
+1 -1
View File
@@ -223,7 +223,7 @@ wheels = [
[[package]]
name = "gamdl"
version = "3.0"
version = "3.3"
source = { virtual = "." }
dependencies = [
{ name = "async-lru" },