Refactor music video to use tags from asset_info when using wrapper

This commit is contained in:
Rafael Moraes
2026-05-18 14:05:33 -03:00
parent e44b037414
commit 86bbb94274
3 changed files with 90 additions and 17 deletions
+74 -1
View File
@@ -17,7 +17,7 @@ from ..api.apple_music import AppleMusicApi
from ..api.itunes import ItunesApi
from .constants import IMAGE_FILE_EXTENSION_MAP
from .enums import CoverFormat
from .types import Cover, DecryptionKey, PlaylistTags
from .types import Cover, DecryptionKey, PlaylistTags, MediaTags, MediaType, MediaRating
logger = structlog.get_logger(__name__)
@@ -336,3 +336,76 @@ class AppleMusicBaseInterface:
log.debug("success", playlist_tags=playlist_tags)
return playlist_tags
async def get_tags_from_asset_info(
self,
asset_data: dict,
lyrics: str | None = None,
use_album_date: bool = False,
) -> MediaTags:
log = logger.bind(
action="get_tags_from_asset_info", asset_id=asset_data["itemId"]
)
tags = MediaTags(
album=asset_data.get("playlistName"),
album_artist=asset_data.get("playlistArtistName"),
album_id=(
int(asset_data["playlistId"]) if asset_data.get("playlistId") else None
),
album_sort=asset_data.get("sort-album"),
artist=asset_data["artistName"],
artist_id=int(asset_data["artistId"]),
artist_sort=asset_data["sort-artist"],
comment=asset_data.get("comments"),
compilation=asset_data.get("compilation"),
composer=asset_data.get("composerName"),
composer_id=(
int(asset_data.get("composerId"))
if asset_data.get("composerId")
else None
),
composer_sort=asset_data.get("sort-composer"),
copyright=asset_data.get("copyright"),
date=(
await self.get_media_date(asset_data["playlistId"])
if use_album_date
else (
self.parse_date(asset_data["releaseDate"])
if asset_data.get("releaseDate")
else None
)
),
disc=asset_data.get("discNumber"),
disc_total=asset_data.get("discCount"),
gapless=asset_data.get("gapless"),
genre=asset_data.get("genre"),
genre_id=int(asset_data["genreId"]),
lyrics=lyrics if lyrics else None,
media_type=(
MediaType.SONG
if asset_data["kind"] == "song"
else MediaType.MUSIC_VIDEO
),
rating=MediaRating(asset_data["explicit"]),
storefront=asset_data["s"],
title=asset_data["itemName"],
title_id=int(asset_data["itemId"]),
title_sort=asset_data["sort-name"],
track=asset_data.get("trackNumber"),
track_total=asset_data.get("trackCount"),
xid=asset_data.get("xid"),
)
log.debug("success", tags=tags)
return tags
async def get_wrapper_playback(self, media_id: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.wrapper_url}/playback",
params={"adam_id": media_id},
)
response.raise_for_status()
return response.json()
+11 -4
View File
@@ -413,10 +413,17 @@ class AppleMusicMusicVideoInterface:
media.cover = await self.base.get_cover(media.media_metadata)
itunes_page_metadata = await self.get_itunes_page_metadata(media.media_metadata)
media.tags = await self.get_tags(
media.media_metadata,
itunes_page_metadata,
)
if self.base.use_wrapper:
playback = await self.base.get_wrapper_playback(media.media_id)
media.tags = await self.base.get_tags_from_asset_info(
playback["songList"][0]["assets"][0]["metadata"],
)
else:
media.tags = await self.get_tags(
media.media_metadata,
itunes_page_metadata,
)
media.stream_info = await self.get_stream_info(
media.media_metadata,
+5 -12
View File
@@ -190,15 +190,6 @@ class AppleMusicSongInterface:
return f"[{timestamp.strftime('%M:%S.%f')[:-4]}]{text}"
async def _get_wrapper_playback(self, media_id: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base.wrapper_url}/playback",
params={"adam_id": media_id},
)
response.raise_for_status()
return response.json()
def _get_m3u8_from_playback(self, playback: dict) -> str | None:
return playback["songList"][0].get("hls-playlist-url")
@@ -493,10 +484,11 @@ class AppleMusicSongInterface:
media.lyrics = await self.get_lyrics(media.media_metadata)
if self.base.use_wrapper:
playback = await self._get_wrapper_playback(media.media_id)
media.tags = await self.get_tags(
playback = await self.base.get_wrapper_playback(media.media_id)
media.tags = await self.base.get_tags_from_asset_info(
playback["songList"][0]["assets"][0]["metadata"],
media.lyrics.unsynced if media.lyrics else None,
self.use_album_date,
)
if not self.skip_stream_info:
m3u8_master_url = self._get_m3u8_from_playback(playback)
@@ -508,9 +500,10 @@ class AppleMusicSongInterface:
webplayback = await self.base.apple_music_api.get_webplayback(
media.media_id
)
media.tags = await self.get_tags(
media.tags = await self.base.get_tags_from_asset_info(
webplayback["songList"][0]["assets"][0]["metadata"],
media.lyrics.unsynced if media.lyrics else None,
self.use_album_date,
)
if not self.skip_stream_info:
m3u8_master_url = await self._get_m3u8_from_metadata(