Refactor interface media fetching

This commit is contained in:
Rafael Moraes
2026-04-24 15:44:19 -03:00
parent 3a907cb76c
commit 34a92b6efc
6 changed files with 266 additions and 439 deletions
+11 -2
View File
@@ -38,5 +38,14 @@ class GamdlInterfaceUrlParseError(GamdlInterfaceError):
class GamdlInterfaceArtistMediaTypeError(GamdlInterfaceError):
def __init__(self, media_type: str):
super().__init__(f"Artist has no media of type: {media_type}")
def __init__(self, media_id: str, media_type: str):
super().__init__(
f"Artist has no media of type (media ID: {media_id}): {media_type}"
)
class GamdlInterfaceFlatFilterExcludedError(GamdlInterfaceError):
def __init__(self, media_id: str, result: Any):
super().__init__(f"Media excluded by flat filter: {media_id}")
self.result = result
+191 -386
View File
@@ -10,6 +10,7 @@ from .exceptions import (
GamdlInterfaceMediaNotAllowedError,
GamdlInterfaceUrlParseError,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceFlatFilterExcludedError,
)
from .music_video import AppleMusicMusicVideoInterface
from .song import AppleMusicSongInterface
@@ -32,7 +33,7 @@ class AppleMusicInterface:
Callable[[ArtistMediaType, list[dict]], list[dict] | None] | None
) = None,
flat_filter_function: Callable[[dict], Any] | None = None,
concurrency: int = 5,
concurrency: int = 1,
disallowed_media_types: list[str] | None = None,
) -> None:
self.song = song
@@ -64,186 +65,120 @@ class AppleMusicInterface:
return url_match
async def _run_flat_filter(self, media: AppleMusicMedia) -> None:
if not self.flat_filter_function or not media.partial:
return
result = self.flat_filter_function(media.media_metadata)
if asyncio.iscoroutine(result):
result = await result
if result:
raise GamdlInterfaceFlatFilterExcludedError(media.media_id, result)
def _run_media_type_filter(self, media: AppleMusicMedia) -> None:
if not self.disallowed_media_types or not media.partial:
return
if media.media_metadata["type"] in self.disallowed_media_types:
raise GamdlInterfaceMediaNotAllowedError(
media.media_metadata["type"],
media.media_id,
)
async def _get_song_media(
self,
index: int,
total: int = 0,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_song(
media_id,
)
)[
"data"
][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
index=index,
total=total,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.base.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata, index, total)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
media = await self.song.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
media.index = index
media.total = total
return media
async for media in self.song.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_music_video_media(
self,
index: int,
total: int = 0,
media_id: str | None = None,
media_id: str,
index: int | None = None,
total: int | None = None,
media_metadata: dict | None = None,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
if not media_metadata:
try:
media_metadata = (
await self.base.apple_music_api.get_music_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
index=index,
total=total,
error=e,
)
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
if not media_id:
media_id = self.base.parse_catalog_media_id(media_metadata)
if index is not None:
media.index = index
if total is not None:
media.total = total
base_media = AppleMusicMedia(media_id, media_metadata, index, total)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
media.media_metadata = media_metadata
media.playlist_metadata = playlist_metadata
try:
media = await self.music_video.get_media(
media_metadata,
playlist_metadata,
playlist_track,
)
media.index = index
media.total = total
return media
async for media in self.music_video.get_media(media):
yield media
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_uploaded_video_media(
self,
media_id: str,
) -> AppleMusicMedia:
try:
media_metadata = (
await self.base.apple_music_api.get_uploaded_video(
media_id,
)
)["data"][0]
except Exception as e:
return AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media = AppleMusicMedia(media_id, media_metadata)
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
base_media.flat_filter_result = flat_filter_result
return base_media
if (
self.disallowed_media_types
and base_media.media_metadata["type"] in self.disallowed_media_types
):
base_media.error = GamdlInterfaceMediaNotAllowedError(
base_media.media_metadata["type"],
media_id,
)
return base_media
) -> AsyncGenerator[AppleMusicMedia, None]:
media = AppleMusicMedia(
media_id=media_id,
)
try:
return await self.uploaded_video.get_media(media_metadata)
async for media in self.music_video.get_media(media):
yield
self._run_media_type_filter(media)
await self._run_flat_filter(media)
except Exception as e:
base_media.error = e
return base_media
media.partial = False
media.error = e
yield media
return
async def _get_album_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_album(
media_id,
)
@@ -252,80 +187,50 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
base_media.partial = False
base_media.error = e
yield base_media
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
yield base_media
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
tasks = [
(
self._get_song_media(
index=index,
total=media_metadata["attributes"]["trackCount"],
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
index=index,
total=media_metadata["attributes"]["trackCount"],
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
)
)
for index, track in enumerate(tracks)
]
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
for task in tasks:
async for media in task:
yield media
async def _get_playlist_media(
self,
media_id: str,
is_library: bool = False,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_library_playlist(
media_id,
)
@@ -334,103 +239,100 @@ class AppleMusicInterface:
media_id,
)
)["data"][0]
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=None,
error=e,
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
tracks = media_metadata["relationships"]["tracks"]["data"]
next_uri = media_metadata["relationships"]["tracks"].get("next")
href_uri = media_metadata["relationships"]["tracks"].get("href")
while next_uri:
try:
tracks = base_media.media_metadata["relationships"]["tracks"]["data"]
next_uri = base_media.media_metadata["relationships"]["tracks"].get("next")
href_uri = base_media.media_metadata["relationships"]["tracks"].get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
tracks.extend(extended_data["data"])
next_uri = extended_data.get("next")
except Exception as e:
base_media.partial = False
base_media.error = e
yield base_media
return
yield base_media
tasks = [
(
self._get_song_media(
index=index,
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
if track["type"] in {"songs", "library-songs"}
else self._get_music_video_media(
index=index,
media_id=track["id"],
index=index,
total=base_media.media_metadata["attributes"]["trackCount"],
media_metadata=track,
playlist_metadata=media_metadata,
playlist_track=index + 1,
playlist_metadata=base_media.media_metadata,
)
)
for index, track in enumerate(tracks)
]
if self.concurrency == 1:
for task in tasks:
async for result in task:
yield result
else:
for task in await safe_gather(*tasks, limit=self.concurrency):
yield task
for task in tasks:
async for media in task:
yield media
async def _get_artist_media(
self,
media_id: str,
) -> AsyncGenerator[AppleMusicMedia, None]:
base_media = AppleMusicMedia(media_id)
try:
media_metadata = (
base_media.media_metadata = (
await self.base.apple_music_api.get_artist(
media_id,
)
)[
"data"
][0]
)["data"][0]
self._run_media_type_filter(base_media)
await self._run_flat_filter(base_media)
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
base_media.media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = base_media.media_metadata.get(relation_key, {}).get(
type_key, {}
)
items = items_relation.get("data", [])
if not items:
raise GamdlInterfaceArtistMediaTypeError(
base_media.media_id,
str(artist_media_type),
)
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
@@ -439,73 +341,7 @@ class AppleMusicInterface:
)
return
if self.flat_filter_function:
flat_filter_result = self.flat_filter_function(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
if flat_filter_result:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
flat_filter_result=flat_filter_result,
)
return
if (
self.disallowed_media_types
and media_metadata["type"] in self.disallowed_media_types
):
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceMediaNotAllowedError(
media_metadata["type"],
media_id,
),
)
return
if self.artist_select_media_type_function:
artist_media_type = self.artist_select_media_type_function(
list(ArtistMediaType),
media_metadata,
)
if asyncio.iscoroutine(artist_media_type):
artist_media_type = await artist_media_type
else:
artist_media_type = list(ArtistMediaType)[0]
relation_key, type_key = artist_media_type.path_key
items_relation = media_metadata.get(relation_key, {}).get(type_key, {})
items = items_relation.get("data", [])
if not items:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=GamdlInterfaceArtistMediaTypeError(str(artist_media_type)),
)
return
next_uri = items_relation.get("next")
href_uri = items_relation.get("href")
while next_uri:
try:
extended_data = await self.base.apple_music_api.get_extended_api_data(
next_uri,
href_uri,
)
except Exception as e:
yield AppleMusicMedia(
media_id=media_id,
media_metadata=media_metadata,
error=e,
)
return
items.extend(extended_data.get("data", []))
next_uri = extended_data.get("next")
yield base_media
if self.artist_select_items_function:
selected_items = self.artist_select_items_function(
@@ -521,63 +357,32 @@ class AppleMusicInterface:
for index, item in enumerate(selected_items):
if item["type"] in {"songs", "library-songs"}:
tasks.append(
(
item["type"],
self._get_song_media(
media_id=item["id"],
media_metadata=item,
index=index,
total=len(selected_items),
),
self._get_song_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
elif item["type"] in {"albums", "library-albums"}:
tasks.append(
(
item["type"],
self._get_album_media(
media_id=item["id"],
),
self._get_album_media(
media_id=item["id"],
)
)
else:
tasks.append(
(
item["type"],
self._get_music_video_media(
media_id=item["id"],
media_metadata=item,
index=index,
total=len(selected_items),
),
self._get_music_video_media(
media_id=item["id"],
index=index,
total=len(selected_items),
media_metadata=item,
)
)
if self.concurrency == 1:
for item_type, task in tasks:
if item_type in {"albums", "library-albums"}:
async for result in task:
yield result
else:
yield await task
else:
async def _collect_generator(generator_or_coroutine, item_type):
if item_type in {"albums", "library-albums"}:
results = []
async for result in generator_or_coroutine:
results.append(result)
return results
else:
return [await generator_or_coroutine]
collected_tasks = [
_collect_generator(task, item_type) for item_type, task in tasks
]
for batch in await safe_gather(*collected_tasks, limit=self.concurrency):
for media in batch:
yield media
for task in tasks:
async for media in task:
yield media
async def get_media_from_url(
self,
@@ -594,20 +399,20 @@ class AppleMusicInterface:
)
if url_info.type == "song" or url_info.sub_id:
media = await self._get_song_media(
async for media in self._get_song_media(
media_id=url_info.sub_id or url_info.id,
index=0,
total=1,
media_id=url_info.sub_id or url_info.id,
)
yield media
):
yield media
elif url_info.type == "music-video":
media = await self._get_music_video_media(
async for media in self._get_music_video_media(
media_id=url_info.id,
index=0,
total=1,
media_id=url_info.id,
)
yield media
):
yield media
elif url_info.type == "album" or url_info.library_type == "albums":
async for media in self._get_album_media(
@@ -624,10 +429,10 @@ class AppleMusicInterface:
yield media
elif url_info.type == "post":
media = await self._get_uploaded_video_media(
async for media in self._get_uploaded_video_media(
media_id=url_info.id,
)
yield media
):
yield media
elif url_info.type == "artist":
async for media in self._get_artist_media(
+22 -19
View File
@@ -1,6 +1,6 @@
import asyncio
import urllib.parse
from typing import Callable
from typing import AsyncGenerator, Callable
import m3u8
import structlog
@@ -378,35 +378,36 @@ class AppleMusicMusicVideoInterface:
async def get_media(
self,
music_video_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: dict | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(music_video_metadata),
media_metadata=music_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_music_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(music_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(music_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(music_video_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(media.media_metadata)
media.tags = await self.get_tags(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
media.stream_info = await self.get_stream_info(
music_video_metadata,
media.media_metadata,
itunes_page_metadata,
)
if not media.stream_info:
@@ -423,4 +424,6 @@ class AppleMusicMusicVideoInterface:
media.decryption_key = await self.get_decryption_key(media.stream_info)
return media
media.partial = False
yield media
+22 -19
View File
@@ -3,9 +3,9 @@ import base64
import datetime
import json
import re
from typing import Callable
from xml.dom import minidom
import struct
from typing import AsyncGenerator, Callable
from xml.dom import minidom
from xml.etree import ElementTree
import m3u8
@@ -475,30 +475,31 @@ class AppleMusicSongInterface:
async def get_media(
self,
song_metadata: dict,
playlist_metadata: dict | None = None,
playlist_track: int | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.base.parse_catalog_media_id(song_metadata),
media_metadata=song_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_song(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(song_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(
media_id=media.media_id,
)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
if media.playlist_metadata:
media.playlist_tags = self.base.get_playlist_tags(
playlist_metadata,
playlist_track,
media.playlist_metadata,
media.index,
)
media.cover = await self.base.get_cover(song_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.lyrics = await self.get_lyrics(song_metadata)
media.lyrics = await self.get_lyrics(media.media_metadata)
webplayback = await self.base.apple_music_api.get_webplayback(media.media_id)
@@ -509,7 +510,7 @@ class AppleMusicSongInterface:
if not self.skip_stream_info:
media.stream_info = await self.get_stream_info(
song_metadata,
media.media_metadata,
webplayback,
)
if not media.stream_info:
@@ -537,4 +538,6 @@ class AppleMusicSongInterface:
)
)
return media
media.partial = False
yield media
+2 -2
View File
@@ -155,9 +155,10 @@ class Cover:
@dataclass
class AppleMusicMedia:
media_id: str
media_metadata: dict
index: int = 0
total: int = 0
partial: bool = True
media_metadata: dict | None = None
error: BaseException | None = None
playlist_metadata: dict | None = None
playlist_tags: PlaylistTags | None = None
@@ -167,7 +168,6 @@ class AppleMusicMedia:
tags: MediaTags | None = None
stream_info: StreamInfoAv | None = None
decryption_key: DecryptionKeyAv | None = None
flat_filter_result: Any = None
@dataclass
+18 -11
View File
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import Callable
from typing import AsyncGenerator
import structlog
@@ -105,22 +106,28 @@ class AppleMusicUploadedVideoInterface:
async def get_media(
self,
uploaded_video_metadata: dict,
) -> AppleMusicMedia:
media = AppleMusicMedia(
uploaded_video_metadata["id"],
uploaded_video_metadata,
)
media: AppleMusicMedia,
) -> AsyncGenerator[AppleMusicMedia, None]:
if not media.media_metadata:
media.media_metadata = (
await self.base.apple_music_api.get_uploaded_video(media.media_id)
)["data"][0]
if not self.base.is_media_streamable(uploaded_video_metadata):
media.media_id = self.base.parse_catalog_media_id(media.media_metadata)
yield media
if not self.base.is_media_streamable(media.media_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
media.cover = await self.base.get_cover(uploaded_video_metadata)
media.cover = await self.base.get_cover(media.media_metadata)
media.stream_info = await self.get_stream_info(uploaded_video_metadata)
media.stream_info = await self.get_stream_info(media.media_metadata)
if not media.stream_info:
raise GamdlInterfaceFormatNotAvailableError(media.media_id)
media.tags = self.get_tags(uploaded_video_metadata)
media.tags = self.get_tags(media.media_metadata)
return media
media.partial = False
yield media