Use composition for AppleMusic interfaces

This commit is contained in:
Rafael Moraes
2026-04-20 10:22:56 -03:00
parent 61ea24bfdd
commit 5bbe87500a
3 changed files with 53 additions and 50 deletions
+24 -26
View File
@@ -4,10 +4,6 @@ from typing import Callable
import m3u8
import structlog
from async_lru import alru_cache
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from pywidevine import Cdm
from .base import AppleMusicBaseInterface
from .constants import MP4_FORMAT_CODECS
@@ -29,7 +25,7 @@ from .types import (
logger = structlog.get_logger(__name__)
class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
class AppleMusicMusicVideoInterface:
def __init__(
self,
base: AppleMusicBaseInterface,
@@ -50,14 +46,14 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
self.ask_video_codec_function = ask_video_codec_function
self.ask_audio_codec_function = ask_audio_codec_function
self.__dict__.update(base.__dict__)
self._base = base
async def get_itunes_page_metadata(
self,
music_video_metadata: dict,
) -> dict:
url_media_id = self.parse_media_id_from_url(music_video_metadata)
itunes_page = await self.itunes_api.get_itunes_page(
url_media_id = self._base.parse_media_id_from_url(music_video_metadata)
itunes_page = await self._base.itunes_api.get_itunes_page(
"music-video",
url_media_id,
)
@@ -92,11 +88,11 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
) -> MediaTags:
log = logger.bind(
action="get_music_video_tags",
media_id=self.parse_catalog_media_id(metadata),
media_id=self._base.parse_catalog_media_id(metadata),
)
url_media_id = self.parse_media_id_from_url(metadata)
lookup_metadata = (await self.itunes_api.get_lookup_result(url_media_id))[
url_media_id = self._base.parse_media_id_from_url(metadata)
lookup_metadata = (await self._base.itunes_api.get_lookup_result(url_media_id))[
"results"
]
@@ -112,18 +108,20 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
artist=lookup_metadata[0]["artistName"],
artist_id=int(lookup_metadata[0]["artistId"]),
copyright=itunes_page_metadata.get("copyright"),
date=self.parse_date(lookup_metadata[0]["releaseDate"]),
date=self._base.parse_date(lookup_metadata[0]["releaseDate"]),
genre=lookup_metadata[0]["primaryGenreName"],
genre_id=int(itunes_page_metadata["genres"][0]["genreId"]),
media_type=MediaType.MUSIC_VIDEO,
storefront=self.itunes_api.storefront_id,
storefront=self._base.itunes_api.storefront_id,
title=lookup_metadata[0]["trackCensoredName"],
title_id=int(metadata["id"]),
rating=rating,
)
if len(lookup_metadata) > 1:
album = await self.get_album_cached(itunes_page_metadata["collectionId"])
album = await self._base.get_album_cached(
itunes_page_metadata["collectionId"]
)
if not album:
return tags
@@ -147,10 +145,10 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
) -> StreamInfoAv | None:
log = logger.bind(
action="get_music_video_stream_info",
media_id=self.parse_catalog_media_id(metadata),
media_id=self._base.parse_catalog_media_id(metadata),
)
url_media_id = self.parse_media_id_from_url(metadata)
url_media_id = self._base.parse_media_id_from_url(metadata)
m3u8_master_url = None
if url_media_id == metadata["id"]:
@@ -159,7 +157,7 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
)
if not m3u8_master_url:
webplayback_response = await self.apple_music_api.get_webplayback(
webplayback_response = await self._base.apple_music_api.get_webplayback(
metadata["id"]
)
m3u8_master_url = self._get_m3u8_master_url_from_webplayback(
@@ -167,7 +165,7 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
)
playlist_master_m3u8_obj = m3u8.loads(
(await self.get_response(m3u8_master_url)).text
(await self._base.get_response(m3u8_master_url)).text
)
playlist_master_m3u8_obj.base_uri = m3u8_master_url.rpartition("/")[0]
stream_info_video = await self._get_stream_info_video(playlist_master_m3u8_obj)
@@ -321,7 +319,7 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
stream_info.width, stream_info.height = playlist.stream_info.resolution
playlist_m3u8_obj = m3u8.loads(
(await self.get_response(stream_info.stream_url)).text
(await self._base.get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = self._get_widevine_pssh(playlist_m3u8_obj)
stream_info.fairplay_key = self._get_fairplay_key(playlist_m3u8_obj)
@@ -347,7 +345,7 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
stream_info.codec = playlist["group_id"]
playlist_m3u8_obj = m3u8.loads(
(await self.get_response(stream_info.stream_url)).text
(await self._base.get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = self._get_widevine_pssh(playlist_m3u8_obj)
stream_info.fairplay_key = self._get_fairplay_key(playlist_m3u8_obj)
@@ -360,11 +358,11 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
stream_info: StreamInfoAv,
) -> DecryptionKeyAv:
decryption_key_video, decryption_key_audio = await asyncio.gather(
super().get_decryption_key(
self._base.get_decryption_key(
stream_info.video_track.widevine_pssh,
stream_info.media_id,
),
super().get_decryption_key(
self._base.get_decryption_key(
stream_info.audio_track.widevine_pssh,
stream_info.media_id,
),
@@ -382,21 +380,21 @@ class AppleMusicMusicVideoInterface(AppleMusicBaseInterface):
playlist_track: dict | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.parse_catalog_media_id(music_video_metadata),
media_id=self._base.parse_catalog_media_id(music_video_metadata),
media_metadata=music_video_metadata,
)
if not self.is_media_streamable(music_video_metadata):
if not self._base.is_media_streamable(music_video_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
media.playlist_tags = self.get_playlist_tags(
media.playlist_tags = self._base.get_playlist_tags(
playlist_metadata,
playlist_track,
)
media.cover = await self.get_cover(music_video_metadata)
media.cover = await self._base.get_cover(music_video_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(music_video_metadata)
media.tags = await self.get_tags(
+23 -18
View File
@@ -33,7 +33,7 @@ from .types import (
logger = structlog.get_logger(__name__)
class AppleMusicSongInterface(AppleMusicBaseInterface):
class AppleMusicSongInterface:
def __init__(
self,
base: AppleMusicBaseInterface,
@@ -49,14 +49,15 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
self.skip_decryption_key_non_legacy = skip_decryption_key_non_legacy
self.ask_codec_function = ask_codec_function
self.__dict__.update(base.__dict__)
self._base = base
async def get_lyrics(
self,
song_metadata: dict,
) -> Lyrics | None:
log = logger.bind(
action="get_lyrics", song_id=self.parse_catalog_media_id(song_metadata)
action="get_lyrics",
song_id=self._base.parse_catalog_media_id(song_metadata),
)
if not song_metadata["attributes"]["hasLyrics"]:
@@ -68,8 +69,8 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
or "lyrics" not in song_metadata["relationships"]
):
song_metadata = (
await self.apple_music_api.get_song(
self.parse_catalog_media_id(song_metadata)
await self._base.apple_music_api.get_song(
self._base.parse_catalog_media_id(song_metadata)
)
)["data"][0]
@@ -218,10 +219,10 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
composer_sort=webplayback_metadata.get("sort-composer"),
copyright=webplayback_metadata.get("copyright"),
date=(
await self.get_media_date(webplayback_metadata["playlistId"])
await self._base.get_media_date(webplayback_metadata["playlistId"])
if self.use_album_date
else (
self.parse_date(webplayback_metadata["releaseDate"])
self._base.parse_date(webplayback_metadata["releaseDate"])
if webplayback_metadata.get("releaseDate")
else None
)
@@ -267,8 +268,8 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
if "extendedAssetUrls" not in song_metadata["attributes"]:
song_metadata = (
await self.apple_music_api.get_song(
self.parse_catalog_media_id(song_metadata),
await self._base.apple_music_api.get_song(
self._base.parse_catalog_media_id(song_metadata),
)
)["data"][0]
@@ -278,7 +279,9 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
if not m3u8_master_url:
return None
m3u8_master_obj = m3u8.loads((await self.get_response(m3u8_master_url)).text)
m3u8_master_obj = m3u8.loads(
(await self._base.get_response(m3u8_master_url)).text
)
m3u8_master_data = m3u8_master_obj.data
if codec == SongCodec.ASK:
@@ -324,7 +327,7 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
)
else:
m3u8_obj = m3u8.loads(
(await self.get_response(stream_info.stream_url)).text
(await self._base.get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = self._get_drm_uri_from_m3u8_keys(
@@ -433,7 +436,9 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
i for i in webplayback["songList"][0]["assets"] if i["flavor"] == flavor
)["URL"]
m3u8_obj = m3u8.loads((await self.get_response(stream_info.stream_url)).text)
m3u8_obj = m3u8.loads(
(await self._base.get_response(stream_info.stream_url)).text
)
stream_info.widevine_pssh = m3u8_obj.keys[0].uri
stream_info_av = StreamInfoAv(
@@ -452,27 +457,27 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
playlist_track: int | None = None,
) -> AppleMusicMedia:
media = AppleMusicMedia(
media_id=self.parse_catalog_media_id(song_metadata),
media_id=self._base.parse_catalog_media_id(song_metadata),
media_metadata=song_metadata,
)
if not self.is_media_streamable(song_metadata):
if not self._base.is_media_streamable(song_metadata):
raise GamdlInterfaceMediaNotStreamableError(
media_id=media.media_id,
)
if playlist_metadata and playlist_track:
media.playlist_metadata = playlist_metadata
media.playlist_tags = self.get_playlist_tags(
media.playlist_tags = self._base.get_playlist_tags(
playlist_metadata,
playlist_track,
)
media.cover = await self.get_cover(song_metadata)
media.cover = await self._base.get_cover(song_metadata)
media.lyrics = await self.get_lyrics(song_metadata)
webplayback = await self.apple_music_api.get_webplayback(media.media_id)
webplayback = await self._base.apple_music_api.get_webplayback(media.media_id)
media.tags = await self.get_tags(
webplayback,
@@ -503,7 +508,7 @@ class AppleMusicSongInterface(AppleMusicBaseInterface):
and not self.skip_decryption_key_non_legacy
) or media.stream_info.audio_track.legacy:
media.decryption_key = DecryptionKeyAv(
audio_track=await self.get_decryption_key(
audio_track=await self._base.get_decryption_key(
media.stream_info.audio_track.widevine_pssh,
media.media_id,
)
+6 -6
View File
@@ -14,7 +14,7 @@ from .types import AppleMusicMedia, MediaFileFormat, MediaTags, StreamInfo, Stre
logger = structlog.get_logger(__name__)
class AppleMusicUploadedVideoInterface(AppleMusicBaseInterface):
class AppleMusicUploadedVideoInterface:
def __init__(
self,
base: AppleMusicBaseInterface,
@@ -24,7 +24,7 @@ class AppleMusicUploadedVideoInterface(AppleMusicBaseInterface):
self.quality = quality
self.ask_quality_function = ask_quality_function
self.__dict__.update(base.__dict__)
self._base = base
def _get_best_stream_url(self, metadata: dict) -> str:
best_quality = next(
@@ -89,10 +89,10 @@ class AppleMusicUploadedVideoInterface(AppleMusicBaseInterface):
tags = MediaTags(
artist=attributes.get("artistName"),
date=self.parse_date(upload_date) if upload_date else None,
date=self._base.parse_date(upload_date) if upload_date else None,
title=attributes.get("name"),
title_id=int(metadata["id"]),
storefront=self.itunes_api.storefront_id,
storefront=self._base.itunes_api.storefront_id,
)
log.debug("success", tags=tags)
@@ -108,10 +108,10 @@ class AppleMusicUploadedVideoInterface(AppleMusicBaseInterface):
uploaded_video_metadata,
)
if not self.is_media_streamable(uploaded_video_metadata):
if not self._base.is_media_streamable(uploaded_video_metadata):
raise GamdlInterfaceMediaNotStreamableError(media.media_id)
media.cover = await self.get_cover(uploaded_video_metadata)
media.cover = await self._base.get_cover(uploaded_video_metadata)
media.stream_info = await self.get_stream_info(uploaded_video_metadata)
if not media.stream_info: