Compare commits

...

10 Commits

Author SHA1 Message Date
Rafael Moraes 9375c2fccd Bump version to 3.3 2026-04-24 19:48:58 -03:00
Rafael Moraes c83e47df0c Remove total arg from media fetch calls 2026-04-24 19:48:27 -03:00
Rafael Moraes 715820e357 Bump version to 3.2 2026-04-24 16:17:49 -03:00
Rafael Moraes 137a739af2 Collect async generators for concurrency 2026-04-24 16:05:37 -03:00
Rafael Moraes 23220d1827 Limit download logging and use interface exception 2026-04-24 15:48:14 -03:00
Rafael Moraes 3c7ea272af Skip partial media; Remove flat filter exception 2026-04-24 15:44:40 -03:00
Rafael Moraes 34a92b6efc Refactor interface media fetching 2026-04-24 15:44:19 -03:00
Rafael Moraes 3a907cb76c Remove skip_decryption_key_non_legacy arg 2026-04-24 13:02:22 -03:00
Rafael Moraes 90646e7193 Use base.use_wrapper for decryption checks 2026-04-24 13:02:07 -03:00
Rafael Moraes 3b2875ccd1 Remove use_wrapper parameter and attribute 2026-04-24 12:59:01 -03:00
13 changed files with 311 additions and 457 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.1"
__version__ = "3.3"
+16 -5
View File
@@ -17,9 +17,7 @@ from ..downloader import (
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
DownloadItem,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
@@ -31,6 +29,7 @@ from ..interface import (
AppleMusicUploadedVideoInterface,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFlatFilterExcludedError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceUrlParseError,
@@ -152,7 +151,6 @@ async def main(config: CliConfig):
synced_lyrics_format=config.synced_lyrics_format,
codec_priority=config.song_codec_piority,
use_album_date=config.use_album_date,
skip_decryption_key_non_legacy=config.use_wrapper,
skip_stream_info=config.synced_lyrics_only,
ask_codec_function=interactive_prompts.ask_song_codec,
)
@@ -262,8 +260,21 @@ async def main(config: CliConfig):
)
else "Unknown Title"
)
media_type = (
download_item.media.media_metadata["type"]
if download_item.media.media_metadata
else None
)
track_log.info(f'Downloading "{media_title}"')
if download_item.media.partial and media_type in {
None,
"songs",
"library-songs",
"music-videos",
"library-music-videos",
"uploaded-videos",
}:
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
@@ -275,7 +286,7 @@ async def main(config: CliConfig):
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
GamdlInterfaceFlatFilterExcludedError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
-2
View File
@@ -27,7 +27,6 @@ class AppleMusicBaseDownloader:
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
use_wrapper: bool = False,
wrapper_decrypt_ip: str = "127.0.0.1:10020",
download_mode: DownloadMode = DownloadMode.YTDLP,
album_folder_template: str = "{album_artist}/{album}",
@@ -50,7 +49,6 @@ class AppleMusicBaseDownloader:
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.use_wrapper = use_wrapper
self.wrapper_decrypt_ip = wrapper_decrypt_ip
self.download_mode = download_mode
self.album_folder_template = album_folder_template
+6 -7
View File
@@ -5,12 +5,10 @@ from typing import AsyncGenerator
import structlog
from ..interface.types import AppleMusicMedia
from .constants import TEMP_PATH_TEMPLATE
from .enums import DownloadMode, RemuxMode
from .exceptions import (
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderFlatFilterExcludedError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
@@ -60,7 +58,10 @@ class AppleMusicDownloader:
self,
media: AppleMusicMedia,
) -> DownloadItem:
if media.error or media.flat_filter_result:
if media.error:
return DownloadItem(media)
if media.partial:
return DownloadItem(media)
elif media.media_metadata["type"] in {"songs", "library-songs"}:
@@ -80,10 +81,8 @@ class AppleMusicDownloader:
if item.media.error:
raise item.media.error
if item.media.flat_filter_result:
raise GamdlDownloaderFlatFilterExcludedError(
item.media.media_metadata["id"]
)
if item.media.partial:
return
await self._initial_processing(item)
await self._download(item)
-5
View File
@@ -18,8 +18,3 @@ class GamdlDownloaderMediaFileExistsError(GamdlDownloaderError):
class GamdlDownloaderDependencyNotFoundError(GamdlDownloaderError):
def __init__(self, dependency_name: str) -> None:
super().__init__(f"Required dependency not found: {dependency_name}")
class GamdlDownloaderFlatFilterExcludedError(GamdlDownloaderError):
def __init__(self, media_id: str) -> None:
super().__init__(f"Media is excluded by flat filter: {media_id}")
+11 -2
View File
@@ -38,5 +38,14 @@ class GamdlInterfaceUrlParseError(GamdlInterfaceError):
class GamdlInterfaceArtistMediaTypeError(GamdlInterfaceError):
def __init__(self, media_type: str):
super().__init__(f"Artist has no media of type: {media_type}")
def __init__(self, media_id: str, media_type: str):
super().__init__(
f"Artist has no media of type (media ID: {media_id}): {media_type}"
)
class GamdlInterfaceFlatFilterExcludedError(GamdlInterfaceError):
def __init__(self, media_id: str, result: Any):
super().__init__(f"Media excluded by flat filter: {media_id}")
self.result = result
+208 -376
View File
@@ -10,6 +10,7 @@ from .exceptions import (
GamdlInterfaceMediaNotAllowedError,
GamdlInterfaceUrlParseError,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceFlatFilterExcludedError,
)
from .music_video import AppleMusicMusicVideoInterface
from .song import AppleMusicSongInterface
@@ -32,7 +33,7 @@ class AppleMusicInterface:
Callable[[ArtistMediaType, list[dict]], list[dict] | None] | None
) = None,
flat_filter_function: Callable[[dict], Any] | None = None,
concurrency: int = 5,
concurrency: int = 1,
disallowed_media_types: list[str] | None = None,
) -> None:
self.song = song
@@ -64,186 +65,128 @@ class AppleMusicInterface:
return url_match
async def _run_flat_filter(self, media: AppleMusicMedia) -> None:
if not self.flat_filter_function or not media.partial:
return
result = self.flat_filter_function(media.media_metadata)
if asyncio.iscoroutine(result):
result = await result
if result:
raise GamdlInterfaceFlatFilterExcludedError(media.media_id, result)
def _run_media_type_filter(self, media: AppleMusicMedia) -> None:
if not self.disallowed_media_types or not media.partial:
return
if media.media_metadata["type"] in self.disallowed_media_types:
raise GamdlInterfaceMediaNotAllowedError(
media.media_metadata["type"],
media.media_id,
)
async def _collect_generator(
self, generator_or_coroutine: AsyncGenerator[AppleMusicMedia, None]
) -> list[AppleMusicMedia]:
results = []
async for result in generator_or_coroutine:
results.append(result)
return results
async def _get_song_media(
self,
index: int,
total: int = 0,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_song(
media_id,
)
)[
"data"
][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
index=index,
total=total,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.base.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata, index, total)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
media = await self.song.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
media.index = index
media.total = total
return media
async for media in self.song.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_music_video_media(
self,
index: int,
total: int = 0,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_music_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
index=index,
total=total,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.base.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata, index, total)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
media = await self.music_video.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
media.index = index
media.total = total
return media
async for media in self.music_video.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_uploaded_video_media(
self,
media_id: str,
) -> AppleMusicMedia:
try:
media_metadata = (
await self.base.apple_music_api.get_uploaded_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media = AppleMusicMedia(media_id, media_metadata)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
try:
return await self.uploaded_video.get_media(media_metadata)
async for media in self.music_video.get_media(media):
yield
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_album_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_album(
media_id,
)
@@ -252,59 +195,32 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media.partial = False
base_media.error = e
yield base_media
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
yield base_media
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
tasks = [
(
self._get_song_media(
index=index,
total=media_metadata["attributes"]["trackCount"],
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
index=index,
total=media_metadata["attributes"]["trackCount"],
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
)
for index, track in enumerate(tracks)
@@ -312,20 +228,24 @@ class AppleMusicInterface:
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
async for media in task:
yield media
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
async def _get_playlist_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_playlist(
media_id,
)
@@ -334,77 +254,42 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
next_uri = media_metadata["relationships"]["tracks"].get("next")
href_uri = media_metadata["relationships"]["tracks"].get("href")
while next_uri:
try:
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
next_uri = base_media.media_metadata["relationships"]["tracks"].get("next")
href_uri = base_media.media_metadata["relationships"]["tracks"].get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
except Exception as e:
base_media.partial = False
base_media.error = e
yield base_media
return
yield base_media
tasks = [
(
self._get_song_media(
index=index,
media_id=track["id"],
index=index,
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
index=index,
media_id=track["id"],
index=index,
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
)
for index, track in enumerate(tracks)
@@ -412,25 +297,62 @@ class AppleMusicInterface:
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
async for media in task:
yield media
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
async def _get_artist_media(
self,
media_id: str,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_artist(
media_id,
)
)[
"data"
][0]
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
base_media.media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = base_media.media_metadata.get(relation_key, {}).get(
type_key, {}
)
items = items_relation.get("data", [])
if not items:
raise GamdlInterfaceArtistMediaTypeError(
base_media.media_id,
str(artist_media_type),
)
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
@@ -439,73 +361,7 @@ class AppleMusicInterface:
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = media_metadata.get(relation_key, {}).get(type_key, {})
items = items_relation.get("data", [])
if not items:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceArtistMediaTypeError(str(artist_media_type)),
)
return
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
try:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
yield base_media
if self.artist_select_items_function:
selected_items = self.artist_select_items_function(
@@ -521,61 +377,37 @@ class AppleMusicInterface:
for index, item in enumerate(selected_items):
if item["type"] in {"songs", "library-songs"}:
tasks.append(
(
item["type"],
self._get_song_media(
media_id=item["id"],
media_metadata=item,
index=index,
total=len(selected_items),
),
self._get_song_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
elif item["type"] in {"albums", "library-albums"}:
tasks.append(
(
item["type"],
self._get_album_media(
media_id=item["id"],
),
self._get_album_media(
media_id=item["id"],
)
)
else:
tasks.append(
(
item["type"],
self._get_music_video_media(
media_id=item["id"],
media_metadata=item,
index=index,
total=len(selected_items),
),
self._get_music_video_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
if self.concurrency == 1:
for item_type, task in tasks:
if item_type in {"albums", "library-albums"}:
async for result in task:
yield result
else:
yield await task
for task in tasks:
async for media in task:
yield media
else:
async def _collect_generator(generator_or_coroutine, item_type):
if item_type in {"albums", "library-albums"}:
results = []
async for result in generator_or_coroutine:
results.append(result)
return results
else:
return [await generator_or_coroutine]
collected_tasks = [
_collect_generator(task, item_type) for item_type, task in tasks
]
for batch in await safe_gather(*collected_tasks, limit=self.concurrency):
collected_tasks = [self._collect_generator(task) for task in tasks]
batches = await safe_gather(*collected_tasks, limit=self.concurrency)
for batch in batches:
for media in batch:
yield media
@@ -594,20 +426,20 @@ class AppleMusicInterface:
)
if url_info.type == "song" or url_info.sub_id:
media = await self._get_song_media(
async for media in self._get_song_media(
media_id=url_info.sub_id or url_info.id,
index=0,
total=1,
media_id=url_info.sub_id or url_info.id,
)
yield media
):
yield media
elif url_info.type == "music-video":
media = await self._get_music_video_media(
async for media in self._get_music_video_media(
media_id=url_info.id,
index=0,
total=1,
media_id=url_info.id,
)
yield media
):
yield media
elif url_info.type == "album" or url_info.library_type == "albums":
async for media in self._get_album_media(
@@ -624,10 +456,10 @@ class AppleMusicInterface:
yield media
elif url_info.type == "post":
media = await self._get_uploaded_video_media(
async for media in self._get_uploaded_video_media(
media_id=url_info.id,
)
yield media
):
yield media
elif url_info.type == "artist":
async for media in self._get_artist_media(
+22 -19
View File
@@ -1,6 +1,6 @@
import asyncio
import urllib.parse
from typing import Callable
from typing import AsyncGenerator, Callable
import m3u8
import structlog
@@ -378,35 +378,36 @@ class AppleMusicMusicVideoInterface:
async def get_media(
self,
music_video_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: dict | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(music_video_metadata),
media_metadata=music_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_music_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(music_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(music_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(music_video_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(media.media_metadata)
media.tags = await self.get_tags(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
media.stream_info = await self.get_stream_info(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
if not media.stream_info:
@@ -423,4 +424,6 @@ class AppleMusicMusicVideoInterface:
media.decryption_key = await self.get_decryption_key(media.stream_info)
return media
media.partial = False
yield media
+25 -25
View File
@@ -3,9 +3,9 @@ import base64
import datetime
import json
import re
from typing import Callable
from xml.dom import minidom
import struct
from typing import AsyncGenerator, Callable
from xml.dom import minidom
from xml.etree import ElementTree
import m3u8
@@ -39,7 +39,6 @@ class AppleMusicSongInterface:
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
codec_priority: list[SongCodec] = [SongCodec.AAC_LEGACY],
use_album_date: bool = False,
skip_decryption_key_non_legacy: bool = False,
skip_stream_info: bool = False,
ask_codec_function: Callable[[list[dict]], dict | None] | None = None,
):
@@ -47,7 +46,6 @@ class AppleMusicSongInterface:
self.synced_lyrics_format = synced_lyrics_format
self.codec_priority = codec_priority
self.use_album_date = use_album_date
self.skip_decryption_key_non_legacy = skip_decryption_key_non_legacy
self.skip_stream_info = skip_stream_info
self.ask_codec_function = ask_codec_function
@@ -477,30 +475,31 @@ class AppleMusicSongInterface:
async def get_media(
self,
song_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(song_metadata),
media_metadata=song_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_song(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(song_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(
media_id=media.media_id,
)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(song_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.lyrics = await self.get_lyrics(song_metadata)
media.lyrics = await self.get_lyrics(media.media_metadata)
webplayback = await self.base.apple_music_api.get_webplayback(media.media_id)
@@ -511,7 +510,7 @@ class AppleMusicSongInterface:
if not self.skip_stream_info:
media.stream_info = await self.get_stream_info(
song_metadata,
media.media_metadata,
webplayback,
)
if not media.stream_info:
@@ -521,17 +520,16 @@ class AppleMusicSongInterface:
)
if (
not self.skip_decryption_key_non_legacy
not self.base.use_wrapper
and not media.stream_info.audio_track.widevine_pssh
) or (
self.skip_decryption_key_non_legacy
and not media.stream_info.audio_track.fairplay_key
self.base.use_wrapper and not media.stream_info.audio_track.fairplay_key
):
raise GamdlInterfaceDecryptionNotAvailableError(media_id=media.media_id)
if (
media.stream_info.audio_track.widevine_pssh
and not self.skip_decryption_key_non_legacy
and not self.base.use_wrapper
) or media.stream_info.audio_track.legacy:
media.decryption_key = DecryptionKeyAv(
audio_track=await self.base.get_decryption_key(
@@ -540,4 +538,6 @@ class AppleMusicSongInterface:
)
)
return media
media.partial = False
yield media
+2 -2
View File
@@ -155,9 +155,10 @@ class Cover:
@dataclass
class AppleMusicMedia:
media_id: str
media_metadata: dict
index: int = 0
total: int = 0
partial: bool = True
media_metadata: dict | None = None
error: BaseException | None = None
playlist_metadata: dict | None = None
playlist_tags: PlaylistTags | None = None
@@ -167,7 +168,6 @@ class AppleMusicMedia:
tags: MediaTags | None = None
stream_info: StreamInfoAv | None = None
decryption_key: DecryptionKeyAv | None = None
flat_filter_result: Any = None
@dataclass
+18 -11
View File
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import Callable
from typing import AsyncGenerator
import structlog
@@ -105,22 +106,28 @@ class AppleMusicUploadedVideoInterface:
async def get_media(
self,
uploaded_video_metadata: dict,
) -> AppleMusicMedia:
media = AppleMusicMedia(
uploaded_video_metadata["id"],
uploaded_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_uploaded_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(uploaded_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
media.cover = await self.base.get_cover(uploaded_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.stream_info = await self.get_stream_info(uploaded_video_metadata)
media.stream_info = await self.get_stream_info(media.media_metadata)
if not media.stream_info:
raise GamdlInterfaceFormatNotAvailableError(media.media_id)
media.tags = self.get_tags(uploaded_video_metadata)
media.tags = self.get_tags(media.media_metadata)
return media
media.partial = False
yield media
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "gamdl"
version = "3.1"
version = "3.3"
description = "A command-line app for downloading Apple Music songs, music videos and post videos."
readme = "README.md"
license = "MIT"
Generated
+1 -1
View File
@@ -223,7 +223,7 @@ wheels = [
[[package]]
name = "gamdl"
version = "3.1"
version = "3.3"
source = { virtual = "." }
dependencies = [
{ name = "async-lru" },