From f9d62ee84bf6738bdc13a4736fdb3efbc6e05e84 Mon Sep 17 00:00:00 2001 From: Rafael Moraes <50295204+glomatico@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:56:32 -0300 Subject: [PATCH] Refactor downloader module --- gamdl/downloader/__init__.py | 9 +- .../{downloader_base.py => base.py} | 197 ++--- gamdl/downloader/constants.py | 43 - gamdl/downloader/downloader.py | 767 +++++------------- gamdl/downloader/downloader_music_video.py | 285 ------- gamdl/downloader/downloader_song.py | 248 ------ gamdl/downloader/downloader_uploaded_video.py | 107 --- gamdl/downloader/enums.py | 22 - gamdl/downloader/exceptions.py | 33 +- gamdl/downloader/hardcoded_wvd.py | 3 - gamdl/downloader/music_video.py | 223 +++++ gamdl/downloader/song.py | 180 ++++ gamdl/downloader/types.py | 37 +- gamdl/downloader/uploaded_video.py | 65 ++ 14 files changed, 764 insertions(+), 1455 deletions(-) rename gamdl/downloader/{downloader_base.py => base.py} (65%) delete mode 100644 gamdl/downloader/downloader_music_video.py delete mode 100644 gamdl/downloader/downloader_song.py delete mode 100644 gamdl/downloader/downloader_uploaded_video.py delete mode 100644 gamdl/downloader/hardcoded_wvd.py create mode 100644 gamdl/downloader/music_video.py create mode 100644 gamdl/downloader/song.py create mode 100644 gamdl/downloader/uploaded_video.py diff --git a/gamdl/downloader/__init__.py b/gamdl/downloader/__init__.py index 8ca117e..12689a9 100644 --- a/gamdl/downloader/__init__.py +++ b/gamdl/downloader/__init__.py @@ -1,8 +1,9 @@ +from .amdecrypt import decrypt_file, decrypt_file_hex +from .base import AppleMusicBaseDownloader from .downloader import AppleMusicDownloader -from .downloader_base import AppleMusicBaseDownloader -from .downloader_music_video import AppleMusicMusicVideoDownloader -from .downloader_song import AppleMusicSongDownloader -from .downloader_uploaded_video import AppleMusicUploadedVideoDownloader from .enums import * from .exceptions import * +from .music_video import AppleMusicMusicVideoDownloader +from .song import AppleMusicSongDownloader from .types import * +from .uploaded_video import AppleMusicUploadedVideoDownloader diff --git a/gamdl/downloader/downloader_base.py b/gamdl/downloader/base.py similarity index 65% rename from gamdl/downloader/downloader_base.py rename to gamdl/downloader/base.py index 9e7056d..b8f9ea5 100644 --- a/gamdl/downloader/downloader_base.py +++ b/gamdl/downloader/base.py @@ -1,30 +1,28 @@ import asyncio import re import shutil -import uuid from pathlib import Path +import structlog from mutagen.mp4 import MP4, MP4Cover -from pywidevine import Cdm, Device from yt_dlp import YoutubeDL from ..interface.enums import CoverFormat +from ..interface.interface import AppleMusicInterface from ..interface.types import MediaTags, PlaylistTags from ..utils import CustomStringFormatter, async_subprocess from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE from .enums import DownloadMode -from .hardcoded_wvd import HARDCODED_WVD + +logger = structlog.get_logger(__name__) class AppleMusicBaseDownloader: def __init__( self, + interface: AppleMusicInterface, output_path: str = "./Apple Music", temp_path: str = ".", - wvd_path: str = None, - overwrite: bool = False, - save_cover: bool = False, - save_playlist: bool = False, nm3u8dlre_path: str = "N_m3u8DL-RE", mp4decrypt_path: str = "mp4decrypt", ffmpeg_path: str = "ffmpeg", @@ -32,26 +30,22 @@ class AppleMusicBaseDownloader: use_wrapper: bool = False, wrapper_decrypt_ip: str = "127.0.0.1:10020", download_mode: DownloadMode = DownloadMode.YTDLP, - cover_format: CoverFormat = CoverFormat.JPG, album_folder_template: str = "{album_artist}/{album}", compilation_folder_template: str = "Compilations/{album}", no_album_folder_template: str = "{artist}/Unknown Album", + playlist_folder_template: str = "Playlists/{playlist_artist}", single_disc_file_template: str = "{track:02d} {title}", multi_disc_file_template: str = "{disc}-{track:02d} {title}", no_album_file_template: str = "{title}", - playlist_file_template: str = "Playlists/{playlist_artist}/{playlist_title}", + playlist_file_template: str = "{playlist_title}", date_tag_template: str = "%Y-%m-%dT%H:%M:%SZ", exclude_tags: list[str] = None, - cover_size: int = 1200, truncate: int = None, silent: bool = False, ): + self.interface = interface self.output_path = output_path self.temp_path = temp_path - self.wvd_path = wvd_path - self.overwrite = overwrite - self.save_cover = save_cover - self.save_playlist = save_playlist self.nm3u8dlre_path = nm3u8dlre_path self.mp4decrypt_path = mp4decrypt_path self.ffmpeg_path = ffmpeg_path @@ -59,64 +53,35 @@ class AppleMusicBaseDownloader: self.use_wrapper = use_wrapper self.wrapper_decrypt_ip = wrapper_decrypt_ip self.download_mode = download_mode - self.cover_format = cover_format self.album_folder_template = album_folder_template self.compilation_folder_template = compilation_folder_template self.no_album_folder_template = no_album_folder_template self.single_disc_file_template = single_disc_file_template self.multi_disc_file_template = multi_disc_file_template + self.playlist_folder_template = playlist_folder_template self.no_album_file_template = no_album_file_template self.playlist_file_template = playlist_file_template self.date_tag_template = date_tag_template self.exclude_tags = exclude_tags - self.cover_size = cover_size self.truncate = truncate self.silent = silent - self.initialize() - def initialize(self): self._initialize_binary_paths() - self._initialize_cdm() def _initialize_binary_paths(self): + log = logger.bind(action="initialize_binary_paths") + self.full_nm3u8dlre_path = shutil.which(self.nm3u8dlre_path) self.full_mp4decrypt_path = shutil.which(self.mp4decrypt_path) self.full_ffmpeg_path = shutil.which(self.ffmpeg_path) self.full_mp4box_path = shutil.which(self.mp4box_path) - def _initialize_cdm(self): - if self.wvd_path: - self.cdm = Cdm.from_device(Device.load(self.wvd_path)) - else: - self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD)) - self.cdm.MAX_NUM_OF_SESSIONS = float("inf") - - def get_random_uuid(self) -> str: - return uuid.uuid4().hex[:8] - - def is_media_streamable( - self, - media_metadata: dict, - ) -> bool: - return bool(media_metadata["attributes"].get("playParams")) - - def get_playlist_tags( - self, - playlist_metadata: dict, - media_metadata: dict, - ) -> PlaylistTags: - playlist_track = ( - playlist_metadata["relationships"]["tracks"]["data"].index(media_metadata) - + 1 - ) - - return PlaylistTags( - playlist_artist=playlist_metadata["attributes"].get( - "curatorName", "Unknown" - ), - playlist_id=playlist_metadata["attributes"]["playParams"]["id"], - playlist_title=playlist_metadata["attributes"]["name"], - playlist_track=playlist_track, + log = log.debug( + "success", + full_nm3u8dlre_path=self.full_nm3u8dlre_path, + full_mp4decrypt_path=self.full_mp4decrypt_path, + full_ffmpeg_path=self.full_ffmpeg_path, + full_mp4box_path=self.full_mp4box_path, ) def get_temp_path( @@ -126,13 +91,19 @@ class AppleMusicBaseDownloader: file_tag: str, file_extension: str, ) -> str: - return str( + log = logger.bind(action="get_temp_path") + + temp_path = str( Path(self.temp_path) / TEMP_PATH_TEMPLATE.format(folder_tag) / (f"{media_id}_{file_tag}" + file_extension) ) - def sanitize_string( + log.debug("success", temp_path=temp_path) + + return temp_path + + def _sanitize_string( self, dirty_string: str, file_ext: str = None, @@ -160,6 +131,8 @@ class AppleMusicBaseDownloader: file_extension: str, playlist_tags: PlaylistTags | None, ) -> str: + log = logger.bind(action="get_final_path") + if tags.album: template_folder_parts = ( self.compilation_folder_template.split("/") @@ -217,29 +190,39 @@ class AppleMusicBaseDownloader: track=(tags.track, ""), track_total=(tags.track_total, ""), ) - sanitized_formatted_part = self.sanitize_string( + sanitized_formatted_part = self._sanitize_string( formatted_part, file_extension if not is_folder else None, ) formatted_parts.append(sanitized_formatted_part) - return str(Path(self.output_path, *formatted_parts)) + final_path = str(Path(self.output_path, *formatted_parts)) + + log.debug("success", final_path=final_path) + + return final_path async def download_stream(self, stream_url: str, download_path: str): + log = logger.bind( + action="download_stream", stream_url=stream_url, download_path=download_path + ) + if self.download_mode == DownloadMode.YTDLP: - await self.download_ytdlp(stream_url, download_path) + await self._download_ytdlp_async(stream_url, download_path) if self.download_mode == DownloadMode.NM3U8DLRE: - await self.download_nm3u8dlre(stream_url, download_path) + await self._download_nm3u8dlre(stream_url, download_path) - async def download_ytdlp(self, stream_url: str, download_path: str) -> None: + log.debug("success") + + async def _download_ytdlp_async(self, stream_url: str, download_path: str) -> None: await asyncio.to_thread( - self._download_ytdlp, + self._download_ytdlp_sync, stream_url, download_path, ) - def _download_ytdlp(self, stream_url: str, download_path: str) -> None: + def _download_ytdlp_sync(self, stream_url: str, download_path: str) -> None: with YoutubeDL( { "quiet": True, @@ -254,7 +237,7 @@ class AppleMusicBaseDownloader: ) as ydl: ydl.download(stream_url) - async def download_nm3u8dlre(self, stream_url: str, download_path: str): + async def _download_nm3u8dlre(self, stream_url: str, download_path: str): download_path_obj = Path(download_path) download_path_obj.parent.mkdir(parents=True, exist_ok=True) @@ -278,11 +261,12 @@ class AppleMusicBaseDownloader: async def apply_tags( self, - media_path: Path, + media_path: str, tags: MediaTags, cover_bytes: bytes | None, - extra_tags: dict | None = None, ): + log = logger.bind(action="apply_tags", media_path=media_path) + exclude_tags = self.exclude_tags or [] filtered_tags = MediaTags( @@ -297,21 +281,21 @@ class AppleMusicBaseDownloader: skip_tagging = "all" in exclude_tags await asyncio.to_thread( - self.apply_mp4_tags, + self._apply_mp4_tags, media_path, mp4_tags, cover_bytes, skip_tagging, - extra_tags, ) - def apply_mp4_tags( + log.debug("success") + + def _apply_mp4_tags( self, - media_path: Path, + media_path: str, tags: dict, cover_bytes: bytes | None, skip_tagging: bool, - extra_tags: dict | None, ): mp4 = MP4(media_path) mp4.clear() @@ -323,14 +307,12 @@ class AppleMusicBaseDownloader: data=cover_bytes, imageformat=( MP4Cover.FORMAT_JPEG - if self.cover_format == CoverFormat.JPG + if self.interface.base.cover_format == CoverFormat.JPG else MP4Cover.FORMAT_PNG ), ) ] mp4.update(tags) - if extra_tags: - mp4.update(extra_tags) mp4.save() @@ -347,82 +329,41 @@ class AppleMusicBaseDownloader: data=cover_bytes, imageformat=( MP4Cover.FORMAT_JPEG - if self.cover_format == CoverFormat.JPG + if self.interface.base.cover_format == CoverFormat.JPG else MP4Cover.FORMAT_PNG ), ) ] - def move_to_final_path(self, stage_path: str, final_path: str) -> None: - Path(final_path).parent.mkdir(parents=True, exist_ok=True) - shutil.move(stage_path, final_path) - - def write_cover_image( - self, - cover_bytes: bytes, - cover_path: str, - ) -> None: - Path(cover_path).parent.mkdir(parents=True, exist_ok=True) - Path(cover_path).write_bytes(cover_bytes) - def get_playlist_file_path( self, tags: PlaylistTags, ) -> str: + log = logger.bind(action="get_playlist_file_path") + + template_folder_parts = self.playlist_folder_template.split("/") template_file_parts = self.playlist_file_template.split("/") + template_parts = template_folder_parts + template_file_parts formatted_parts = [] - for i, part in enumerate(template_file_parts): - is_folder = i < len(template_file_parts) - 1 + for i, part in enumerate(template_parts): + is_folder = i < len(template_parts) - 1 formatted_part = CustomStringFormatter().format( part, - playlist_artist=(tags.playlist_artist, "Unknown Playlist Artist"), + playlist_artist=(tags.artist, "Unknown Playlist Artist"), playlist_id=(tags.playlist_id, "Unknown Playlist ID"), - playlist_title=(tags.playlist_title, "Unknown Playlist Title"), - playlist_track=(tags.playlist_track, ""), + playlist_title=(tags.title, "Unknown Playlist Title"), + playlist_track=(tags.track, ""), ) - file_ext = None if is_folder else ".m3u8" - sanitized_formatted_part = self.sanitize_string( + file_ext = None if is_folder else ".m3u" + sanitized_formatted_part = self._sanitize_string( formatted_part, file_ext, ) formatted_parts.append(sanitized_formatted_part) - return str(Path(self.output_path, *formatted_parts)) + final_path = str(Path(self.output_path, *formatted_parts)) - def update_playlist_file( - self, - playlist_file_path: str, - final_path: str, - playlist_track: int, - ) -> None: - playlist_file_path_obj = Path(playlist_file_path) - final_path_obj = Path(final_path) - output_dir_obj = Path(self.output_path) + log.debug("success", playlist_file_path=final_path) - playlist_file_path_obj.parent.mkdir(parents=True, exist_ok=True) - playlist_file_path_parent_parts_len = len(playlist_file_path_obj.parent.parts) - output_path_parts_len = len(output_dir_obj.parts) - - final_path_relative = Path( - ("../" * (playlist_file_path_parent_parts_len - output_path_parts_len)), - *final_path_obj.parts[output_path_parts_len:], - ) - playlist_file_lines = ( - playlist_file_path_obj.open("r", encoding="utf8").readlines() - if playlist_file_path_obj.exists() - else [] - ) - if len(playlist_file_lines) < playlist_track: - playlist_file_lines.extend( - "\n" for _ in range(playlist_track - len(playlist_file_lines)) - ) - - playlist_file_lines[playlist_track - 1] = final_path_relative.as_posix() + "\n" - with playlist_file_path_obj.open("w", encoding="utf8") as playlist_file: - playlist_file.writelines(playlist_file_lines) - - def cleanup_temp(self, random_uuid: str) -> None: - temp_folder = Path(self.temp_path) / TEMP_PATH_TEMPLATE.format(random_uuid) - if temp_folder.exists(): - shutil.rmtree(temp_folder) + return final_path diff --git a/gamdl/downloader/constants.py b/gamdl/downloader/constants.py index eff48b7..d8a517c 100644 --- a/gamdl/downloader/constants.py +++ b/gamdl/downloader/constants.py @@ -1,46 +1,3 @@ -import re - TEMP_PATH_TEMPLATE = "gamdl_temp_{}" ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]' ILLEGAL_CHAR_REPLACEMENT = "_" - -SONG_MEDIA_TYPE = {"song", "songs", "library-songs"} -ALBUM_MEDIA_TYPE = {"album", "albums", "library-albums"} -MUSIC_VIDEO_MEDIA_TYPE = {"music-video", "music-videos", "library-music-videos"} -ARTIST_MEDIA_TYPE = {"artist", "artists", "library-artists"} -UPLOADED_VIDEO_MEDIA_TYPE = {"post", "uploaded-videos"} -PLAYLIST_MEDIA_TYPE = {"playlist", "playlists", "library-playlists"} - -ARTIST_AUTO_SELECT_KEY_MAP = { - "main-albums": ("views", "full-albums"), - "compilation-albums": ("views", "compilation-albums"), - "live-albums": ("views", "live-albums"), - "singles-eps": ("views", "singles"), - "all-albums": ("relationships", "albums"), - "top-songs": ("views", "top-songs"), - "music-videos": ("relationships", "music-videos"), -} -ARTIST_AUTO_SELECT_STR_MAP = { - "main-albums": "Main Albums", - "compilation-albums": "Compilation Albums", - "live-albums": "Live Albums", - "singles-eps": "Singles & EPs", - "all-albums": "All Albums", - "top-songs": "Top Songs", - "music-videos": "Music Videos", -} - -VALID_URL_PATTERN = re.compile( - r"https://(?:classical\.)?music\.apple\.com" - r"(?:" - r"/(?P[a-z]{2})" - r"/(?Partist|album|playlist|song|music-video|post)" - r"(?:/(?P[^\s/]+))?" - r"/(?P[0-9]+|pl\.[0-9a-z]{32}|pl\.u-[a-zA-Z0-9]+)" - r"(?:\?i=(?P[0-9]+))?" - r"|" - r"(?:/(?P[a-z]{2}))?" - r"/library/(?Pplaylist|albums)" - r"/(?Pp\.[a-zA-Z0-9]+|l\.[a-zA-Z0-9]+)" - r")" -) diff --git a/gamdl/downloader/downloader.py b/gamdl/downloader/downloader.py index c59f251..6d2abc4 100644 --- a/gamdl/downloader/downloader.py +++ b/gamdl/downloader/downloader.py @@ -1,617 +1,264 @@ import asyncio import typing from pathlib import Path +from typing import AsyncGenerator +import structlog +import shutil from InquirerPy import inquirer from InquirerPy.base.control import Choice -from ..api.exceptions import ApiError from ..interface import AppleMusicInterface -from ..utils import safe_gather -from .constants import ( - ALBUM_MEDIA_TYPE, - ARTIST_MEDIA_TYPE, - MUSIC_VIDEO_MEDIA_TYPE, - PLAYLIST_MEDIA_TYPE, - SONG_MEDIA_TYPE, - UPLOADED_VIDEO_MEDIA_TYPE, - VALID_URL_PATTERN, -) -from .downloader_base import AppleMusicBaseDownloader -from .downloader_music_video import AppleMusicMusicVideoDownloader -from .downloader_song import AppleMusicSongDownloader -from .downloader_uploaded_video import AppleMusicUploadedVideoDownloader +from .base import AppleMusicBaseDownloader from .enums import ArtistAutoSelect, DownloadMode, RemuxMode from .exceptions import ( - ExecutableNotFound, - FormatNotAvailable, - MediaFileExists, - NotStreamable, - SyncedLyricsOnly, - UnsupportedMediaType, + GamdlDownloaderSyncedLyricsOnlyError, + GamdlDownloaderMediaFileExistsError, + GamdlDownloaderDependencyNotFoundError, ) -from .types import DownloadItem, UrlInfo +from .music_video import AppleMusicMusicVideoDownloader +from .song import AppleMusicSongDownloader +from .types import DownloadItem +from .constants import TEMP_PATH_TEMPLATE +from .uploaded_video import AppleMusicUploadedVideoDownloader + +logger = structlog.get_logger(__name__) class AppleMusicDownloader: def __init__( self, - interface: AppleMusicInterface, - base_downloader: AppleMusicBaseDownloader, - song_downloader: AppleMusicSongDownloader, - music_video_downloader: AppleMusicMusicVideoDownloader, - uploaded_video_downloader: AppleMusicUploadedVideoDownloader, - artist_auto_select: ArtistAutoSelect | None = None, - skip_music_videos: bool = False, + song: AppleMusicSongDownloader, + music_video: AppleMusicMusicVideoDownloader, + uploaded_video: AppleMusicUploadedVideoDownloader, + overwrite: bool = False, + save_cover: bool = False, + save_playlist: bool = False, + no_synced_lyrics: bool = False, + synced_lyrics_only: bool = False, + skip_cleanup: bool = False, skip_processing: bool = False, - flat_filter: typing.Callable = None, ): - self.interface = interface - self.base_downloader = base_downloader - self.song_downloader = song_downloader - self.music_video_downloader = music_video_downloader - self.uploaded_video_downloader = uploaded_video_downloader - self.artist_auto_select = artist_auto_select - self.skip_music_videos = skip_music_videos + self.song = song + self.music_video = music_video + self.uploaded_video = uploaded_video + self.overwrite = overwrite + self.save_cover = save_cover + self.save_playlist = save_playlist + self.no_synced_lyrics = no_synced_lyrics + self.synced_lyrics_only = synced_lyrics_only + self.skip_cleanup = skip_cleanup self.skip_processing = skip_processing - self.flat_filter = flat_filter - async def get_single_download_item( + self.base = song.base + + async def get_download_item_from_url( self, - media_metadata: dict, - playlist_metadata: dict = None, - ) -> DownloadItem: - if self.flat_filter: - flat_filter_result = self.flat_filter(media_metadata) - if asyncio.iscoroutine(flat_filter_result): - flat_filter_result = await flat_filter_result + url: str, + ) -> AsyncGenerator[DownloadItem, None]: + async for media in self.base.interface.get_media_from_url(url): + if media.error or media.flat_filter_result: + yield DownloadItem(media) - if flat_filter_result: - return DownloadItem( - media_metadata=media_metadata, - playlist_metadata=playlist_metadata, - flat_filter_result=flat_filter_result, - ) + elif media.media_metadata["type"] in {"songs", "library-songs"}: + yield await self.song.get_download_item(media) - return await self.get_single_download_item_no_filter( - media_metadata, - playlist_metadata, - ) + elif media.media_metadata["type"] in { + "music-videos", + "library-music-videos", + }: + yield await self.music_video.get_download_item(media) - async def get_single_download_item_no_filter( - self, - media_metadata: dict, - playlist_metadata: dict = None, - ) -> DownloadItem: + elif media.media_metadata["type"] in {"uploaded-videos"}: + yield await self.uploaded_video.get_download_item(media) + + async def download(self, item: DownloadItem) -> None: try: - if not self.base_downloader.is_media_streamable( - media_metadata, - ): - raise NotStreamable(media_metadata["id"]) + if item.media.error: + raise item.media.error - if media_metadata["type"] in SONG_MEDIA_TYPE: - if not self.song_downloader: - raise UnsupportedMediaType(media_metadata["type"]) - - download_item = await self.song_downloader.get_download_item( - media_metadata, - playlist_metadata, - ) - - if media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE: - if not self.music_video_downloader: - raise UnsupportedMediaType(media_metadata["type"]) - - download_item = await self.music_video_downloader.get_download_item( - media_metadata, - playlist_metadata, - ) - - if media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE: - if not self.uploaded_video_downloader: - raise UnsupportedMediaType(media_metadata["type"]) - - download_item = await self.uploaded_video_downloader.get_download_item( - media_metadata, - ) - except Exception as e: - download_item = DownloadItem( - media_metadata=media_metadata, - playlist_metadata=playlist_metadata, - error=e, - ) - - return download_item - - async def get_collection_download_items( - self, - collection_metadata: dict, - ) -> list[DownloadItem]: - tracks_metadata = collection_metadata["relationships"]["tracks"]["data"] - async for extended_data in self.interface.apple_music_api.extend_api_data( - collection_metadata["relationships"]["tracks"], - ): - tracks_metadata.extend(extended_data["data"]) - - tasks = [ - self.get_single_download_item( - media_metadata, - ( - collection_metadata - if collection_metadata["type"] in PLAYLIST_MEDIA_TYPE - else None - ), - ) - for media_metadata in tracks_metadata - ] - - download_items = await safe_gather(*tasks) - return download_items - - async def get_artist_download_items( - self, - artist_metadata: dict, - ) -> list[DownloadItem]: - if not self.artist_auto_select: - available_choices = [] - for artist_auto_select_option in list(ArtistAutoSelect): - relation_key, type_key = artist_auto_select_option.path_key - available_choices.append( - Choice( - name=str(artist_auto_select_option), - value=(artist_auto_select_option,), - ), - ) - - (artist_auto_select,) = await inquirer.select( - message=f'Select which type to download for artist "{artist_metadata["attributes"]["name"]}":', - choices=available_choices, - validate=lambda result: artist_metadata.get(result[0].path_key[0], {}) - .get(result[0].path_key[1], {}) - .get("data"), - ).execute_async() - else: - artist_auto_select = self.artist_auto_select - - relation_key, type_key = artist_auto_select.path_key - async for extended_data in self.interface.apple_music_api.extend_api_data( - artist_metadata[relation_key][type_key], - ): - artist_metadata[relation_key][type_key]["data"].extend(extended_data["data"]) - - selected_items = artist_metadata[relation_key][type_key]["data"] - select_all = self.artist_auto_select is not None - - if artist_auto_select in { - ArtistAutoSelect.MAIN_ALBUMS, - ArtistAutoSelect.COMPILATION_ALBUMS, - ArtistAutoSelect.LIVE_ALBUMS, - ArtistAutoSelect.SINGLES_EPS, - ArtistAutoSelect.ALL_ALBUMS, - }: - return await self.get_artist_albums_download_items( - selected_items, - select_all, - ) - elif artist_auto_select == ArtistAutoSelect.TOP_SONGS: - return await self.get_artist_songs_download_items( - selected_items, - select_all, - ) - elif artist_auto_select == ArtistAutoSelect.MUSIC_VIDEOS: - return await self.get_artist_music_videos_download_items( - selected_items, - select_all, - ) - - async def get_artist_albums_download_items( - self, - albums_metadata: list[dict], - select_all: bool = False, - ) -> list[DownloadItem]: - if not select_all: - choices = [ - Choice( - name=" | ".join( - [ - f'{album["attributes"]["trackCount"]:03d}', - f'{album["attributes"]["releaseDate"]:<10}', - f'{album["attributes"].get("contentRating", "None").title():<8}', - f'{album["attributes"]["name"]}', - ] - ), - value=album, - ) - for album in albums_metadata - if album.get("attributes") - ] - selected = await inquirer.select( - message="Select which albums to download: (Track Count | Release Date | Rating | Title)", - choices=choices, - multiselect=True, - ).execute_async() - else: - selected = albums_metadata - - download_items = [] - - album_tasks = [ - self.interface.apple_music_api.get_album(album_metadata["id"]) - for album_metadata in selected - ] - album_responses = await safe_gather(*album_tasks) - - track_tasks = [ - self.get_collection_download_items(album_response["data"][0]) - for album_response in album_responses - ] - track_results = await safe_gather(*track_tasks) - - for track_result in track_results: - download_items.extend(track_result) - - return download_items - - async def get_artist_music_videos_download_items( - self, - music_videos_metadata: list[dict], - select_all: bool = False, - ) -> list[DownloadItem]: - if not select_all: - choices = [ - Choice( - name=" | ".join( - [ - self.millis_to_min_sec( - music_video["attributes"]["durationInMillis"] - ), - f'{music_video["attributes"].get("contentRating", "None").title():<8}', - music_video["attributes"]["name"], - ], - ), - value=music_video, - ) - for music_video in music_videos_metadata - if music_video.get("attributes") - ] - selected = await inquirer.select( - message="Select which music videos to download: (Duration | Rating | Title)", - choices=choices, - multiselect=True, - ).execute_async() - else: - selected = music_videos_metadata - - music_video_tasks = [ - self.get_single_download_item( - music_video_metadata, - ) - for music_video_metadata in selected - ] - download_items = await safe_gather(*music_video_tasks) - - return download_items - - async def get_artist_songs_download_items( - self, - songs_metadata: list[dict], - select_all: bool = False, - ) -> list[DownloadItem]: - if not select_all: - choices = [ - Choice( - name=" | ".join( - [ - self.millis_to_min_sec( - song["attributes"]["durationInMillis"] - ), - f'{song["attributes"].get("contentRating", "None").title():<8}', - song["attributes"]["name"], - ], - ), - value=song, - ) - for song in songs_metadata - if song.get("attributes") - ] - selected = await inquirer.select( - message="Select which songs to download: (Duration | Rating | Title)", - choices=choices, - multiselect=True, - ).execute_async() - else: - selected = songs_metadata - - song_tasks = [ - self.get_single_download_item( - song_metadata, - ) - for song_metadata in selected - ] - download_items = await safe_gather(*song_tasks) - - return download_items - - def millis_to_min_sec(self, millis) -> str: - minutes, seconds = divmod(millis // 1000, 60) - return f"{minutes:02}:{seconds:02}" - - def get_url_info(self, url: str) -> UrlInfo | None: - match = VALID_URL_PATTERN.match(url) - if not match: - return None - - return UrlInfo( - **match.groupdict(), - ) - - async def get_download_queue( - self, - url_info: UrlInfo, - ) -> list[DownloadItem] | None: - return await self._get_download_queue( - "song" if url_info.sub_id else url_info.type or url_info.library_type, - url_info.sub_id or url_info.id or url_info.library_id, - url_info.library_id is not None, - ) - - async def _get_download_queue( - self, - url_type: str, - id: str, - is_library: bool, - ) -> list[DownloadItem] | None: - download_items = [] - - if url_type in ARTIST_MEDIA_TYPE: - try: - artist_response = await self.interface.apple_music_api.get_artist( - id, - ) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if artist_response is None: - return None - - download_items = await self.get_artist_download_items( - artist_response["data"][0], - ) - - if url_type in SONG_MEDIA_TYPE: - try: - song_respose = await self.interface.apple_music_api.get_song(id) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if song_respose is None: - return None - - download_items.append( - await self.get_single_download_item(song_respose["data"][0]) - ) - - if url_type in ALBUM_MEDIA_TYPE: - try: - if is_library: - album_response = ( - await self.interface.apple_music_api.get_library_album(id) - ) - else: - album_response = await self.interface.apple_music_api.get_album(id) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if album_response is None: - return None - - download_items = await self.get_collection_download_items( - album_response["data"][0], - ) - - if url_type in PLAYLIST_MEDIA_TYPE: - try: - if is_library: - playlist_response = ( - await self.interface.apple_music_api.get_library_playlist(id) - ) - else: - playlist_response = ( - await self.interface.apple_music_api.get_playlist(id) - ) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if playlist_response is None: - return None - - download_items = await self.get_collection_download_items( - playlist_response["data"][0], - ) - - if url_type in MUSIC_VIDEO_MEDIA_TYPE: - try: - music_video_response = ( - await self.interface.apple_music_api.get_music_video(id) - ) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if music_video_response is None: - return None - - download_items.append( - await self.get_single_download_item(music_video_response["data"][0]) - ) - - if url_type in UPLOADED_VIDEO_MEDIA_TYPE: - try: - uploaded_video = ( - await self.interface.apple_music_api.get_uploaded_video(id) - ) - except ApiError as e: - if e.status_code == 404: - return None - raise e - - if uploaded_video is None: - return None - - download_items.append( - await self.get_single_download_item(uploaded_video["data"][0]) - ) - - return download_items - - async def download( - self, - download_item: DownloadItem, - ) -> DownloadItem: - try: - if download_item.flat_filter_result: - download_item = await self.get_single_download_item_no_filter( - download_item.media_metadata, - download_item.playlist_metadata, - ) - - if download_item.error: - raise download_item.error - - await self._initial_processing(download_item) - await self._download(download_item) - await self._final_processing(download_item) - - return download_item + await self._initial_processing(item) + await self._download(item) + await self._final_processing(item) finally: - if isinstance(download_item, DownloadItem) and not self.skip_processing: - self.base_downloader.cleanup_temp(download_item.random_uuid) + self._cleanup_temp(item.uuid_) - async def _download( + def _update_playlist_file( self, - download_item: DownloadItem, + playlist_file_path: str, + final_path: str, + playlist_track: int, ) -> None: - if ( - self.song_downloader.synced_lyrics_only - and download_item.media_metadata["type"] not in SONG_MEDIA_TYPE - ): - raise SyncedLyricsOnly() + log = logger.bind( + action="update_playlist_file", + playlist_file_path=playlist_file_path, + final_path=final_path, + playlist_track=playlist_track, + ) - if self.song_downloader.synced_lyrics_only: - return + playlist_file_path_obj = Path(playlist_file_path) + final_path_obj = Path(final_path) + output_dir_obj = Path(self.base.output_path) - if ( - Path(download_item.final_path).exists() - and not self.base_downloader.overwrite - ): - raise MediaFileExists(download_item.final_path) + playlist_file_path_obj.parent.mkdir(parents=True, exist_ok=True) + playlist_file_path_parent_parts_len = len(playlist_file_path_obj.parent.parts) + output_path_parts_len = len(output_dir_obj.parts) - if ( - self.base_downloader.download_mode == DownloadMode.NM3U8DLRE - and not self.base_downloader.full_nm3u8dlre_path - ): - raise ExecutableNotFound("N_m3u8DL-RE") - - if download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE: - if ( - self.music_video_downloader.remux_mode == RemuxMode.FFMPEG - and not self.base_downloader.full_ffmpeg_path - ): - raise ExecutableNotFound("ffmpeg") - - if ( - self.music_video_downloader.remux_mode == RemuxMode.MP4BOX - and not self.base_downloader.full_mp4box_path - ): - raise ExecutableNotFound("MP4Box") - - if ( - download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE - or self.music_video_downloader.remux_mode == RemuxMode.MP4BOX - ) and not self.base_downloader.full_mp4decrypt_path: - raise ExecutableNotFound("mp4decrypt") - - if ( - not download_item.stream_info - or not download_item.stream_info.audio_track - or not download_item.stream_info.audio_track.stream_url - or ( - ( - not download_item.decryption_key - or not download_item.decryption_key.audio_track - or not download_item.decryption_key.audio_track.key - ) - and not self.base_downloader.use_wrapper + final_path_relative = Path( + ("../" * (playlist_file_path_parent_parts_len - output_path_parts_len)), + *final_path_obj.parts[output_path_parts_len:], + ) + playlist_file_lines = ( + playlist_file_path_obj.open("r", encoding="utf8").readlines() + if playlist_file_path_obj.exists() + else [] + ) + if len(playlist_file_lines) < playlist_track: + playlist_file_lines.extend( + "\n" for _ in range(playlist_track - len(playlist_file_lines)) ) - ): - raise FormatNotAvailable(download_item.media_metadata["id"]) - if download_item.media_metadata["type"] in SONG_MEDIA_TYPE: - await self.song_downloader.download(download_item) + playlist_file_lines[playlist_track - 1] = final_path_relative.as_posix() + "\n" + with playlist_file_path_obj.open("w", encoding="utf8") as playlist_file: + playlist_file.writelines(playlist_file_lines) - if download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE: - await self.music_video_downloader.download(download_item) + log.debug("success") - if download_item.media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE: - await self.uploaded_video_downloader.download(download_item) + def _write_cover(self, cover_path: str, cover_bytes: bytes) -> None: + log = logger.bind(action="write_cover_file", cover_path=cover_path) - async def _initial_processing( - self, - download_item: DownloadItem, - ) -> None: + Path(cover_path).parent.mkdir(parents=True, exist_ok=True) + with open(cover_path, "wb") as f: + f.write(cover_bytes) + + log.debug("success") + + def _write_synced_lyrics(self, synced_lyrics_path: str, lyrics: str) -> None: + log = logger.bind( + action="write_synced_lyrics", + synced_lyrics_path=synced_lyrics_path, + ) + + Path(synced_lyrics_path).parent.mkdir(parents=True, exist_ok=True) + with open(synced_lyrics_path, "w", encoding="utf-8") as f: + f.write(lyrics) + + log.debug("success") + + async def _initial_processing(self, item: DownloadItem) -> None: if self.skip_processing: return - if download_item.cover_path and self.base_downloader.save_cover: - cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url) - if cover_bytes and ( - self.base_downloader.overwrite - or not Path(download_item.cover_path).exists() - ): - self.base_downloader.write_cover_image( + if item.playlist_file_path and item.final_path and self.save_playlist: + self._update_playlist_file( + item.playlist_file_path, + item.final_path, + item.media.playlist_tags.track, + ) + + if item.cover_path and self.save_cover and item.media.cover.url: + cover_bytes = await self.base.interface.base.get_cover_bytes( + item.media.cover.url, + ) + if cover_bytes and (self.overwrite or not Path(item.cover_path).exists()): + self._write_cover( + item.cover_path, cover_bytes, - download_item.cover_path, ) if ( - download_item.lyrics - and download_item.lyrics.synced - and not self.song_downloader.no_synced_lyrics - and ( - self.base_downloader.overwrite - or not Path(download_item.synced_lyrics_path).exists() - ) + item.synced_lyrics_path + and not self.no_synced_lyrics + and item.media.lyrics + and item.media.lyrics.synced + and (self.overwrite or not Path(item.synced_lyrics_path).exists()) ): - self.song_downloader.write_synced_lyrics( - download_item.lyrics.synced, - download_item.synced_lyrics_path, + self._write_synced_lyrics( + item.synced_lyrics_path, + item.media.lyrics.synced, ) - if download_item.playlist_tags and self.base_downloader.save_playlist: - self.base_downloader.update_playlist_file( - download_item.playlist_file_path, - download_item.final_path, - download_item.playlist_tags.playlist_track, + async def _download(self, item: DownloadItem) -> None: + if item.media.error: + raise item.media.error + + if self.synced_lyrics_only: + raise GamdlDownloaderSyncedLyricsOnlyError( + "Download mode is set to synced lyrics only" ) + if Path(item.final_path).exists() and not self.overwrite: + raise GamdlDownloaderMediaFileExistsError(item.final_path) + + if item.media.media_metadata["type"] in { + "music-videos", + "library-music-videos", + "songs", + "library-songs", + }: + if ( + self.base.download_mode == DownloadMode.NM3U8DLRE + and not self.base.full_nm3u8dlre_path + ): + raise GamdlDownloaderDependencyNotFoundError("N_m3u8DL-RE") + + if item.media.media_metadata["type"] in {"songs", "library-songs"}: + await self.song.download(item) + + elif item.media.media_metadata["type"] in { + "music-videos", + "library-music-videos", + }: + if ( + self.music_video.remux_mode == RemuxMode.FFMPEG + and not self.base.full_ffmpeg_path + ): + raise GamdlDownloaderDependencyNotFoundError("FFmpeg") + + if ( + self.music_video.remux_mode == RemuxMode.MP4BOX + and not self.base.full_mp4box_path + and not self.base.full_mp4decrypt_path + ): + raise GamdlDownloaderDependencyNotFoundError( + "MP4Box and/or mp4decrypt" + ) + + await self.music_video.download(item) + + elif item.media.media_metadata["type"] in {"uploaded-videos"}: + await self.uploaded_video.download(item) + + def _move_to_final_path(self, staged_path: str, final_path: str) -> None: + log = logger.bind( + action="move_to_final_path", + staged_path=staged_path, + final_path=final_path, + ) + + Path(final_path).parent.mkdir(parents=True, exist_ok=True) + shutil.move(staged_path, final_path) + + log.debug("success") + async def _final_processing( self, - download_item: DownloadItem, + item: DownloadItem, ) -> None: if self.skip_processing: return - if download_item.staged_path and Path(download_item.staged_path).exists(): - self.base_downloader.move_to_final_path( - download_item.staged_path, - download_item.final_path, + if Path(item.staged_path).exists(): + self._move_to_final_path( + item.staged_path, + item.final_path, ) + + def _cleanup_temp(self, folder_tag: str) -> None: + log = logger.bind(action="cleanup_temp", folder_tag=folder_tag) + + temp_path = Path(self.base.temp_path) / TEMP_PATH_TEMPLATE.format(folder_tag) + if temp_path.exists() and temp_path.is_dir() and not self.skip_cleanup: + shutil.rmtree(temp_path, ignore_errors=True) + + log.debug("success") diff --git a/gamdl/downloader/downloader_music_video.py b/gamdl/downloader/downloader_music_video.py deleted file mode 100644 index bbfa6e0..0000000 --- a/gamdl/downloader/downloader_music_video.py +++ /dev/null @@ -1,285 +0,0 @@ -from pathlib import Path - -from ..interface.enums import CoverFormat, MusicVideoCodec, MusicVideoResolution -from ..interface.interface_music_video import AppleMusicMusicVideoInterface -from ..interface.types import DecryptionKeyAv -from ..utils import async_subprocess -from .downloader_base import AppleMusicBaseDownloader -from .enums import RemuxFormatMusicVideo, RemuxMode -from .types import DownloadItem - - -class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader): - def __init__( - self, - base_downloader: AppleMusicBaseDownloader, - interface: AppleMusicMusicVideoInterface, - codec_priority: list[MusicVideoCodec] = [ - MusicVideoCodec.H264, - MusicVideoCodec.H265, - ], - remux_mode: RemuxMode = RemuxMode.FFMPEG, - remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V, - resolution: MusicVideoResolution = MusicVideoResolution.R1080P, - ): - self.__dict__.update(base_downloader.__dict__) - self.interface = interface - self.codec_priority = codec_priority - self.remux_mode = remux_mode - self.remux_format = remux_format - self.resolution = resolution - - async def remux_mp4box( - self, - input_path_video: str, - input_path_audio: str, - output_path: str, - ): - await async_subprocess( - self.full_mp4box_path, - "-quiet", - "-add", - input_path_audio, - "-add", - input_path_video, - "-itags", - "artist=placeholder", - "-keep-utc", - "-new", - output_path, - silent=self.silent, - ) - - async def remux_ffmpeg( - self, - input_path_video: str, - input_path_audio: str, - output_path: str, - decryption_key: str = None, - ): - if decryption_key: - key = [ - "-decryption_key", - decryption_key, - ] - else: - key = [] - - await async_subprocess( - self.full_ffmpeg_path, - "-loglevel", - "error", - "-y", - *key, - "-i", - input_path_video, - "-i", - input_path_audio, - "-c", - "copy", - "-c:s", - "mov_text", - "-movflags", - "+faststart", - output_path, - silent=self.silent, - ) - - async def decrypt_mp4decrypt( - self, - input_path: str, - output_path: str, - decryption_key: str, - ): - await async_subprocess( - self.full_mp4decrypt_path, - "--key", - f"1:{decryption_key}", - input_path, - output_path, - silent=self.silent, - ) - - async def stage( - self, - encrypted_path_video: str, - encrypted_path_audio: str, - decrypted_path_video: str, - decrypted_path_audio: str, - staged_path: str, - decryption_key: DecryptionKeyAv, - ): - await self.decrypt_mp4decrypt( - encrypted_path_video, - decrypted_path_video, - decryption_key.video_track.key, - ) - await self.decrypt_mp4decrypt( - encrypted_path_audio, - decrypted_path_audio, - decryption_key.audio_track.key, - ) - - if self.remux_mode == RemuxMode.MP4BOX: - await self.remux_mp4box( - decrypted_path_video, - decrypted_path_audio, - staged_path, - ) - else: - await self.remux_ffmpeg( - decrypted_path_video, - decrypted_path_audio, - staged_path, - ) - - def get_cover_path( - self, - final_path: str, - file_extension: str, - ) -> str: - return str(Path(final_path).with_suffix(file_extension)) - - async def get_download_item( - self, - music_video_metadata: dict, - playlist_metadata: dict = None, - ) -> DownloadItem: - download_item = DownloadItem() - - download_item.media_metadata = music_video_metadata - download_item.playlist_metadata = playlist_metadata - - music_video_id = self.interface.get_media_id_of_library_media( - music_video_metadata, - ) - - itunes_page_metadata = await self.interface.get_itunes_page_metadata( - music_video_metadata, - ) - download_item.media_tags = await self.interface.get_tags( - music_video_metadata, - itunes_page_metadata, - ) - - if playlist_metadata: - download_item.playlist_tags = self.get_playlist_tags( - playlist_metadata, - music_video_metadata, - ) - download_item.playlist_file_path = self.get_playlist_file_path( - download_item.playlist_tags, - ) - - download_item.stream_info = await self.interface.get_stream_info( - music_video_metadata, - itunes_page_metadata, - self.codec_priority, - self.resolution, - ) - - download_item.decryption_key = await self.interface.get_decryption_key( - download_item.stream_info, - self.cdm, - ) - - download_item.random_uuid = self.get_random_uuid() - download_item.staged_path = self.get_temp_path( - music_video_id, - download_item.random_uuid, - "staged", - ( - "." - + ( - "mp4" - if self.remux_format == RemuxFormatMusicVideo.MP4 - else download_item.stream_info.file_format.value - ) - ), - ) - download_item.final_path = self.get_final_path( - download_item.media_tags, - Path(download_item.staged_path).suffix, - playlist_metadata, - ) - - download_item.cover_url_template = self.interface.get_cover_url_template( - music_video_metadata, - self.cover_format, - ) - download_item.cover_url = self.interface.get_cover_url( - download_item.cover_url_template, - self.cover_size, - self.cover_format, - ) - - cover_file_extension = await self.interface.get_cover_file_extension( - download_item.cover_url, - self.cover_format, - ) - if cover_file_extension: - download_item.cover_path = self.get_cover_path( - download_item.final_path, - cover_file_extension, - ) - - return download_item - - async def download( - self, - download_item: DownloadItem, - ) -> None: - encrypted_path_video = self.get_temp_path( - download_item.media_metadata["id"], - download_item.random_uuid, - "encrypted_video", - ".mp4", - ) - encrypted_path_audio = self.get_temp_path( - download_item.media_metadata["id"], - download_item.random_uuid, - "encrypted_audio", - ".m4a", - ) - - await self.download_stream( - download_item.stream_info.video_track.stream_url, - encrypted_path_video, - ) - await self.download_stream( - download_item.stream_info.audio_track.stream_url, - encrypted_path_audio, - ) - - decrypted_path_video = self.get_temp_path( - download_item.media_metadata["id"], - download_item.random_uuid, - "decrypted_video", - ".mp4", - ) - decrypted_path_audio = self.get_temp_path( - download_item.media_metadata["id"], - download_item.random_uuid, - "decrypted_audio", - ".m4a", - ) - - await self.stage( - encrypted_path_video, - encrypted_path_audio, - decrypted_path_video, - decrypted_path_audio, - download_item.staged_path, - download_item.decryption_key, - ) - - cover_bytes = ( - await self.interface.get_cover_bytes(download_item.cover_url) - if self.cover_format != CoverFormat.RAW - else None - ) - await self.apply_tags( - download_item.staged_path, - download_item.media_tags, - cover_bytes, - ) diff --git a/gamdl/downloader/downloader_song.py b/gamdl/downloader/downloader_song.py deleted file mode 100644 index 42e3aab..0000000 --- a/gamdl/downloader/downloader_song.py +++ /dev/null @@ -1,248 +0,0 @@ -from pathlib import Path - -from ..interface.enums import CoverFormat, SongCodec, SyncedLyricsFormat -from ..interface.interface_song import AppleMusicSongInterface -from ..interface.types import DecryptionKeyAv -from .amdecrypt import decrypt_file, decrypt_file_hex -from .downloader_base import AppleMusicBaseDownloader -from .types import DownloadItem - - -class AppleMusicSongDownloader(AppleMusicBaseDownloader): - def __init__( - self, - base_downloader: AppleMusicBaseDownloader, - interface: AppleMusicSongInterface, - codec_priority: SongCodec = [SongCodec.AAC_LEGACY], - synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC, - no_synced_lyrics: bool = False, - synced_lyrics_only: bool = False, - use_album_date: bool = False, - fetch_extra_tags: bool = False, - ): - self.__dict__.update(base_downloader.__dict__) - self.interface = interface - self.codec_priority = codec_priority - self.synced_lyrics_format = synced_lyrics_format - self.no_synced_lyrics = no_synced_lyrics - self.synced_lyrics_only = synced_lyrics_only - self.use_album_date = use_album_date - self.fetch_extra_tags = fetch_extra_tags - - async def get_download_item( - self, - song_metadata: dict, - playlist_metadata: dict = None, - ) -> DownloadItem: - download_item = DownloadItem() - - download_item.media_metadata = song_metadata - download_item.playlist_metadata = playlist_metadata - - song_id = self.interface.get_media_id_of_library_media(song_metadata) - - download_item.lyrics = await self.interface.get_lyrics( - song_metadata, - self.synced_lyrics_format, - ) - - webplayback = await self.interface.apple_music_api.get_webplayback(song_id) - download_item.media_tags = await self.interface.get_tags( - webplayback, - download_item.lyrics.unsynced if download_item.lyrics else None, - self.use_album_date, - ) - if self.fetch_extra_tags: - download_item.extra_tags = await self.interface.get_extra_tags( - song_metadata, - ) - - if playlist_metadata: - download_item.playlist_tags = self.get_playlist_tags( - playlist_metadata, - song_metadata, - ) - download_item.playlist_file_path = self.get_playlist_file_path( - download_item.playlist_tags, - ) - - download_item.final_path = self.get_final_path( - download_item.media_tags, - ".m4a", - download_item.playlist_tags, - ) - download_item.synced_lyrics_path = self.get_lyrics_synced_path( - download_item.final_path, - ) - - if self.synced_lyrics_only: - return download_item - - for codec in self.codec_priority: - download_item.stream_info = await self.interface.get_stream_info( - codec, - song_metadata, - webplayback, - ) - if download_item.stream_info: - break - - if download_item.stream_info.audio_track.legacy: - download_item.decryption_key = ( - await self.interface.get_decryption_key_legacy( - download_item.stream_info, - self.cdm, - ) - ) - elif ( - not self.use_wrapper - and download_item.stream_info - and download_item.stream_info.audio_track.widevine_pssh - ): - download_item.decryption_key = await self.interface.get_decryption_key( - download_item.stream_info, - self.cdm, - ) - - download_item.cover_url_template = self.interface.get_cover_url_template( - song_metadata, - self.cover_format, - ) - download_item.cover_url = self.interface.get_cover_url( - download_item.cover_url_template, - self.cover_size, - self.cover_format, - ) - - download_item.random_uuid = self.get_random_uuid() - if download_item.stream_info and download_item.stream_info.file_format: - download_item.staged_path = self.get_temp_path( - song_id, - download_item.random_uuid, - "staged", - "." + download_item.stream_info.file_format.value, - ) - else: - download_item.staged_path = None - - cover_file_extension = await self.interface.get_cover_file_extension( - download_item.cover_url, - self.cover_format, - ) - if cover_file_extension: - download_item.cover_path = self.get_cover_path( - download_item.final_path, - cover_file_extension, - ) - - return download_item - - async def decrypt_amdecrypt( - self, - input_path: str, - output_path: str, - media_id: str, - fairplay_key: str, - ) -> None: - await decrypt_file( - self.wrapper_decrypt_ip, - media_id, - fairplay_key, - input_path, - output_path, - ) - - async def decrypt_amdecrypt_hex( - self, - input_path: str, - output_path: str, - decryption_key: str, - legacy: bool = False, - ) -> None: - await decrypt_file_hex( - input_path, - output_path, - decryption_key, - legacy=legacy, - ) - - async def stage( - self, - encrypted_path: str, - staged_path: str, - decryption_key: DecryptionKeyAv, - legacy: bool, - media_id: str, - fairplay_key: str, - ): - if self.use_wrapper and not legacy: - await self.decrypt_amdecrypt( - encrypted_path, - staged_path, - media_id, - fairplay_key, - ) - else: - await self.decrypt_amdecrypt_hex( - encrypted_path, - staged_path, - decryption_key.audio_track.key, - legacy, - ) - - def get_lyrics_synced_path(self, final_path: str) -> str: - return str(Path(final_path).with_suffix("." + self.synced_lyrics_format.value)) - - def get_cover_path( - self, - final_path: str, - file_extension: str, - ) -> str: - return str(Path(final_path).parent / ("Cover" + file_extension)) - - def write_synced_lyrics( - self, - synced_lyrics: str, - lyrics_synced_path: str, - ): - Path(lyrics_synced_path).parent.mkdir(parents=True, exist_ok=True) - Path(lyrics_synced_path).write_text(synced_lyrics, encoding="utf8") - - async def download( - self, - download_item: DownloadItem, - ) -> None: - if self.synced_lyrics_only: - return - - encrypted_path = self.get_temp_path( - download_item.media_metadata["id"], - download_item.random_uuid, - "encrypted", - ".m4a", - ) - await self.download_stream( - download_item.stream_info.audio_track.stream_url, - encrypted_path, - ) - - await self.stage( - encrypted_path, - download_item.staged_path, - download_item.decryption_key, - download_item.stream_info.audio_track.legacy, - download_item.media_metadata["id"], - download_item.stream_info.audio_track.fairplay_key, - ) - - cover_bytes = ( - await self.interface.get_cover_bytes(download_item.cover_url) - if self.cover_format != CoverFormat.RAW - else None - ) - await self.apply_tags( - download_item.staged_path, - download_item.media_tags, - cover_bytes, - download_item.extra_tags, - ) diff --git a/gamdl/downloader/downloader_uploaded_video.py b/gamdl/downloader/downloader_uploaded_video.py deleted file mode 100644 index 49046da..0000000 --- a/gamdl/downloader/downloader_uploaded_video.py +++ /dev/null @@ -1,107 +0,0 @@ -from pathlib import Path - -from ..interface.enums import CoverFormat, UploadedVideoQuality -from ..interface.interface_uploaded_video import AppleMusicUploadedVideoInterface -from .downloader_base import AppleMusicBaseDownloader -from .types import DownloadItem - - -class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader): - def __init__( - self, - base_downloader: AppleMusicBaseDownloader, - interface: AppleMusicUploadedVideoInterface, - quality: UploadedVideoQuality = UploadedVideoQuality.BEST, - ): - self.__dict__.update(base_downloader.__dict__) - self.interface = interface - self.quality = quality - - def get_cover_path(self, final_path: str, file_extension: str) -> str: - return str(Path(final_path).with_suffix(file_extension)) - - async def get_download_item( - self, - uploaded_video_metadata: dict, - ) -> DownloadItem: - try: - return await self._get_download_item( - uploaded_video_metadata, - ) - except Exception as e: - return DownloadItem( - media_metadata=uploaded_video_metadata, - error=e, - ) - - async def _get_download_item( - self, - uploaded_video_metadata: dict, - ) -> DownloadItem: - download_item = DownloadItem() - - download_item.media_metadata = uploaded_video_metadata - - download_item.media_tags = self.interface.get_tags( - uploaded_video_metadata, - ) - - download_item.stream_info = await self.interface.get_stream_info( - uploaded_video_metadata, - self.quality, - ) - - download_item.random_uuid = self.get_random_uuid() - download_item.staged_path = self.get_temp_path( - uploaded_video_metadata["id"], - download_item.random_uuid, - "staged", - "." + download_item.stream_info.file_format.value, - ) - download_item.final_path = self.get_final_path( - download_item.media_tags, - Path(download_item.staged_path).suffix, - None, - ) - - download_item.cover_url_template = self.interface.get_cover_url_template( - uploaded_video_metadata, - self.cover_format, - ) - download_item.cover_url = self.interface.get_cover_url( - download_item.cover_url_template, - self.cover_size, - self.cover_format, - ) - - cover_file_extension = await self.interface.get_cover_file_extension( - download_item.cover_url, - self.cover_format, - ) - if cover_file_extension: - download_item.cover_path = self.get_cover_path( - download_item.final_path, - cover_file_extension, - ) - - return download_item - - async def download( - self, - download_item: DownloadItem, - ) -> None: - await self.download_ytdlp( - download_item.stream_info.video_track.stream_url, - download_item.staged_path, - ) - - cover_bytes = ( - await self.interface.get_cover_bytes(download_item.cover_url) - if self.cover_format != CoverFormat.RAW - else None - ) - await self.apply_tags( - download_item.staged_path, - download_item.media_tags, - cover_bytes, - ) diff --git a/gamdl/downloader/enums.py b/gamdl/downloader/enums.py index d41636b..f7cc5eb 100644 --- a/gamdl/downloader/enums.py +++ b/gamdl/downloader/enums.py @@ -1,10 +1,5 @@ from enum import Enum -from .constants import ( - ARTIST_AUTO_SELECT_KEY_MAP, - ARTIST_AUTO_SELECT_STR_MAP, -) - class DownloadMode(Enum): YTDLP = "ytdlp" @@ -19,20 +14,3 @@ class RemuxMode(Enum): class RemuxFormatMusicVideo(Enum): M4V = "m4v" MP4 = "mp4" - - -class ArtistAutoSelect(Enum): - MAIN_ALBUMS = "main-albums" - COMPILATION_ALBUMS = "compilation-albums" - LIVE_ALBUMS = "live-albums" - SINGLES_EPS = "singles-eps" - ALL_ALBUMS = "all-albums" - TOP_SONGS = "top-songs" - MUSIC_VIDEOS = "music-videos" - - @property - def path_key(self) -> tuple[str, str]: - return ARTIST_AUTO_SELECT_KEY_MAP[self.value] - - def __str__(self) -> str: - return ARTIST_AUTO_SELECT_STR_MAP[self.value] diff --git a/gamdl/downloader/exceptions.py b/gamdl/downloader/exceptions.py index 83dc85b..4561b5c 100644 --- a/gamdl/downloader/exceptions.py +++ b/gamdl/downloader/exceptions.py @@ -1,31 +1,20 @@ from ..utils import GamdlError -class MediaFileExists(GamdlError): - def __init__(self, media_path: str): - super().__init__(f"Media file already exists at path: {media_path}") +class GamdlDownloaderError(GamdlError): + pass -class NotStreamable(GamdlError): - def __init__(self, media_id: str): - super().__init__(f"Media ID is not streamable: {media_id}") +class GamdlDownloaderSyncedLyricsOnlyError(GamdlDownloaderError): + def __init__(self) -> None: + super().__init__("Download mode is set to synced lyrics only") -class FormatNotAvailable(GamdlError): - def __init__(self, media_id: str): - super().__init__(f"Requested format is not available for media ID: {media_id}") +class GamdlDownloaderMediaFileExistsError(GamdlDownloaderError): + def __init__(self, file_path: str) -> None: + super().__init__(f"Media file already exists: {file_path}") -class ExecutableNotFound(GamdlError): - def __init__(self, executable: str): - super().__init__(f"Executable not found: {executable}") - - -class SyncedLyricsOnly(GamdlError): - def __init__(self): - super().__init__("Only downloading synced lyrics is supported") - - -class UnsupportedMediaType(GamdlError): - def __init__(self, media_type: str): - super().__init__(f"Unsupported media type: {media_type}") +class GamdlDownloaderDependencyNotFoundError(GamdlDownloaderError): + def __init__(self, dependency_name: str) -> None: + super().__init__(f"Required dependency not found: {dependency_name}") diff --git a/gamdl/downloader/hardcoded_wvd.py b/gamdl/downloader/hardcoded_wvd.py deleted file mode 100644 index a307b09..0000000 --- a/gamdl/downloader/hardcoded_wvd.py +++ /dev/null @@ -1,3 +0,0 @@ -# Dumped from Android Studio Virtual Device running Android 9 - -HARDCODED_WVD = """V1ZEAgIDAASoMIIEpAIBAAKCAQEAwnCFAPXy4U1J7p1NohAS+xl040f5FBaE/59bPp301bGz0UGFT9VoEtY3vaeakKh/d319xTNvCSWsEDRaMmp/wSnMiEZUkkl04872jx2uHuR4k6KYuuJoqhsIo1TwUBueFZynHBUJzXQeW8Eb1tYAROGwp8W7r+b0RIjHC89RFnfVXpYlF5I6McktyzJNSOwlQbMqlVihfSUkv3WRd3HFmA0Oxay51CEIkoTlNTHVlzVyhov5eHCDSp7QENRgaaQ03jC/CcgFOoQymhsBtRCM0CQmfuAHjA9e77R6m/GJPy75G9fqoZM1RMzVDHKbKZPd3sFd0c0+77gLzW8cWEaaHwIDAQABAoIBAQCB2pN46MikHvHZIcTPDt0eRQoDH/YArGl2Lf7J+sOgU2U7wv49KtCug9IGHwDiyyUVsAFmycrF2RroV45FTUq0vi2SdSXV7Kjb20Ren/vBNeQw9M37QWmU8Sj7q6YyWb9hv5T69DHvvDTqIjVtbM4RMojAAxYti5hmjNIh2PrWfVYWhXxCQ/WqAjWLtZBM6Oww1byfr5I/wFogAKkgHi8wYXZ4LnIC8V7jLAhujlToOvMMC9qwcBiPKDP2FO+CPSXaqVhH+LPSEgLggnU3EirihgxovbLNAuDEeEbRTyR70B0lW19tLHixso4ZQa7KxlVUwOmrHSZf7nVuWqPpxd+BAoGBAPQLyJ1IeRavmaU8XXxfMdYDoc8+xB7v2WaxkGXb6ToX1IWPkbMz4yyVGdB5PciIP3rLZ6s1+ruuRRV0IZ98i1OuN5TSR56ShCGg3zkd5C4L/xSMAz+NDfYSDBdO8BVvBsw21KqSRUi1ctL7QiIvfedrtGb5XrE4zhH0gjXlU5qZAoGBAMv2segn0Jx6az4rqRa2Y7zRx4iZ77JUqYDBI8WMnFeR54uiioTQ+rOs3zK2fGIWlrn4ohco/STHQSUTB8oCOFLMx1BkOqiR+UyebO28DJY7+V9ZmxB2Guyi7W8VScJcIdpSOPyJFOWZQKXdQFW3YICD2/toUx/pDAJh1sEVQsV3AoGBANyyp1rthmvoo5cVbymhYQ08vaERDwU3PLCtFXu4E0Ow90VNn6Ki4ueXcv/gFOp7pISk2/yuVTBTGjCblCiJ1en4HFWekJwrvgg3Vodtq8Okn6pyMCHRqvWEPqD5hw6rGEensk0K+FMXnF6GULlfn4mgEkYpb+PvDhSYvQSGfkPJAoGAF/bAKFqlM/1eJEvU7go35bNwEiij9Pvlfm8y2L8Qj2lhHxLV240CJ6IkBz1Rl+S3iNohkT8LnwqaKNT3kVB5daEBufxMuAmOlOX4PmZdxDj/r6hDg8ecmjj6VJbXt7JDd/c5ItKoVeGPqu035dpJyE+1xPAY9CLZel4scTsiQTkCgYBt3buRcZMwnc4qqpOOQcXK+DWD6QvpkcJ55ygHYw97iP/lF4euwdHd+I5b+11pJBAao7G0fHX3eSjqOmzReSKboSe5L8ZLB2cAI8AsKTBfKHWmCa8kDtgQuI86fUfirCGdhdA9AVP2QXN2eNCuPnFWi0WHm4fYuUB5be2c18ucxAb9CAESmgsK3QMIAhIQ071yBlsbLoO2CSB9Ds0cmRif6uevBiKOAjCCAQoCggEBAMJwhQD18uFNSe6dTaIQEvsZdONH+RQWhP+fWz6d9NWxs9FBhU/VaBLWN72nmpCof3d9fcUzbwklrBA0WjJqf8EpzIhGVJJJdOPO9o8drh7keJOimLriaKobCKNU8FAbnhWcpxwVCc10HlvBG9bWAEThsKfFu6/m9ESIxwvPURZ31V6WJReSOjHJLcsyTUjsJUGzKpVYoX0lJL91kXdxxZgNDsWsudQhCJKE5TUx1Zc1coaL+Xhwg0qe0BDUYGmkNN4wvwnIBTqEMpobAbUQjNAkJn7gB4wPXu+0epvxiT8u+RvX6qGTNUTM1QxymymT3d7BXdHNPu+4C81vHFhGmh8CAwEAASjwIkgBUqoBCAEQABqBAQQlRbfiBNDb6eU6aKrsH5WJaYszTioXjPLrWN9dqyW0vwfT11kgF0BbCGkAXew2tLJJqIuD95cjJvyGUSN6VyhL6dp44fWEGDSBIPR0mvRq7bMP+m7Y/RLKf83+OyVJu/BpxivQGC5YDL9f1/A8eLhTDNKXs4Ia5DrmTWdPTPBL8SIgyfUtg3ofI+/I9Tf7it7xXpT0AbQBJfNkcNXGpO3JcBMSgAIL5xsXK5of1mMwAl6ygN1Gsj4aZ052otnwN7kXk12SMsXheWTZ/PYh2KRzmt9RPS1T8hyFx/Kp5VkBV2vTAqqWrGw/dh4URqiHATZJUlhO7PN5m2Kq1LVFdXjWSzP5XBF2S83UMe+YruNHpE5GQrSyZcBqHO0QrdPcU35GBT7S7+IJr2AAXvnjqnb8yrtpPWN2ZW/IWUJN2z4vZ7/HV4aj3OZhkxC1DIMNyvsusUKoQQuf8gwKiEe8cFwbwFSicywlFk9la2IPe8oFShcxAzHLCCn/TIYUAvEL3/4LgaZvqWm80qCPYbgIP5HT8hPYkKWJ4WYknEWK+3InbnkzteFfGrQFCq4CCAESEGnj6Ji7LD+4o7MoHYT4jBQYjtW+kQUijgIwggEKAoIBAQDY9um1ifBRIOmkPtDZTqH+CZUBbb0eK0Cn3NHFf8MFUDzPEz+emK/OTub/hNxCJCao//pP5L8tRNUPFDrrvCBMo7Rn+iUb+mA/2yXiJ6ivqcN9Cu9i5qOU1ygon9SWZRsujFFB8nxVreY5Lzeq0283zn1Cg1stcX4tOHT7utPzFG/ReDFQt0O/GLlzVwB0d1sn3SKMO4XLjhZdncrtF9jljpg7xjMIlnWJUqxDo7TQkTytJmUl0kcM7bndBLerAdJFGaXc6oSY4eNy/IGDluLCQR3KZEQsy/mLeV1ggQ44MFr7XOM+rd+4/314q/deQbjHqjWFuVr8iIaKbq+R63ShAgMBAAEo8CISgAMii2Mw6z+Qs1bvvxGStie9tpcgoO2uAt5Zvv0CDXvrFlwnSbo+qR71Ru2IlZWVSbN5XYSIDwcwBzHjY8rNr3fgsXtSJty425djNQtF5+J2jrAhf3Q2m7EI5aohZGpD2E0cr+dVj9o8x0uJR2NWR8FVoVQSXZpad3M/4QzBLNto/tz+UKyZwa7Sc/eTQc2+ZcDS3ZEO3lGRsH864Kf/cEGvJRBBqcpJXKfG+ItqEW1AAPptjuggzmZEzRq5xTGf6or+bXrKjCpBS9G1SOyvCNF1k5z6lG8KsXhgQxL6ADHMoulxvUIihyPY5MpimdXfUdEQ5HA2EqNiNVNIO4qP007jW51yAeThOry4J22xs8RdkIClOGAauLIl0lLA4flMzW+VfQl5xYxP0E5tuhn0h+844DslU8ZF7U1dU2QprIApffXD9wgAACk26Rggy8e96z8i86/+YYyZQkc9hIdCAERrgEYCEbByzONrdRDs1MrS/ch1moV5pJv63BIKvQHGvLkaFwoMY29tcGFueV9uYW1lEgd1bmtub3duGioKCm1vZGVsX25hbWUSHEFuZHJvaWQgU0RLIGJ1aWx0IGZvciB4ODZfNjQaGwoRYXJjaGl0ZWN0dXJlX25hbWUSBng4Nl82NBodCgtkZXZpY2VfbmFtZRIOZ2VuZXJpY194ODZfNjQaIAoMcHJvZHVjdF9uYW1lEhBzZGtfcGhvbmVfeDg2XzY0GmMKCmJ1aWxkX2luZm8SVUFuZHJvaWQvc2RrX3Bob25lX3g4Nl82NC9nZW5lcmljX3g4Nl82NDo5L1BTUjEuMTgwNzIwLjAxMi80OTIzMjE0OnVzZXJkZWJ1Zy90ZXN0LWtleXMaHgoUd2lkZXZpbmVfY2RtX3ZlcnNpb24SBjE0LjAuMBokCh9vZW1fY3J5cHRvX3NlY3VyaXR5X3BhdGNoX2xldmVsEgEwMg4QASAAKA0wAEAASABQAA==""" diff --git a/gamdl/downloader/music_video.py b/gamdl/downloader/music_video.py new file mode 100644 index 0000000..6bb94dc --- /dev/null +++ b/gamdl/downloader/music_video.py @@ -0,0 +1,223 @@ +from pathlib import Path + +from ..interface.enums import CoverFormat +from ..interface.types import AppleMusicMedia, DecryptionKeyAv +from ..utils import async_subprocess +from .base import AppleMusicBaseDownloader +from .enums import RemuxFormatMusicVideo, RemuxMode +from .types import DownloadItem + + +class AppleMusicMusicVideoDownloader: + def __init__( + self, + base: AppleMusicBaseDownloader, + remux_mode: RemuxMode = RemuxMode.FFMPEG, + remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V, + ): + self.base = base + self.remux_mode = remux_mode + self.remux_format = remux_format + + async def _remux_mp4box( + self, + input_path_video: str, + input_path_audio: str, + output_path: str, + ): + await async_subprocess( + self.base.full_mp4box_path, + "-quiet", + "-add", + input_path_audio, + "-add", + input_path_video, + "-itags", + "artist=placeholder", + "-keep-utc", + "-new", + output_path, + silent=self.base.silent, + ) + + async def _remux_ffmpeg( + self, + input_path_video: str, + input_path_audio: str, + output_path: str, + decryption_key: str = None, + ): + if decryption_key: + key = [ + "-decryption_key", + decryption_key, + ] + else: + key = [] + + await async_subprocess( + self.base.full_ffmpeg_path, + "-loglevel", + "error", + "-y", + *key, + "-i", + input_path_video, + "-i", + input_path_audio, + "-c", + "copy", + "-c:s", + "mov_text", + "-movflags", + "+faststart", + output_path, + silent=self.base.silent, + ) + + async def _decrypt_mp4decrypt( + self, + input_path: str, + output_path: str, + decryption_key: str, + ): + await async_subprocess( + self.base.full_mp4decrypt_path, + "--key", + f"1:{decryption_key}", + input_path, + output_path, + silent=self.base.silent, + ) + + async def stage( + self, + encrypted_path_video: str, + encrypted_path_audio: str, + decrypted_path_video: str, + decrypted_path_audio: str, + staged_path: str, + decryption_key: DecryptionKeyAv, + ): + await self._decrypt_mp4decrypt( + encrypted_path_video, + decrypted_path_video, + decryption_key.video_track.key, + ) + await self._decrypt_mp4decrypt( + encrypted_path_audio, + decrypted_path_audio, + decryption_key.audio_track.key, + ) + + if self.remux_mode == RemuxMode.MP4BOX: + await self._remux_mp4box( + decrypted_path_video, + decrypted_path_audio, + staged_path, + ) + else: + await self._remux_ffmpeg( + decrypted_path_video, + decrypted_path_audio, + staged_path, + ) + + def get_cover_path( + self, + final_path: str, + file_extension: str, + ) -> str: + return str(Path(final_path).with_suffix(file_extension)) + + async def get_download_item( + self, + media: AppleMusicMedia, + ) -> DownloadItem: + download_item = DownloadItem(media) + + download_item.staged_path = self.base.get_temp_path( + media.media_metadata["id"], + download_item.uuid_, + "staged", + "." + media.stream_info.file_format.value, + ) + + download_item.final_path = self.base.get_final_path( + media.tags, + "." + media.stream_info.file_format.value, + media.playlist_tags, + ) + + if media.playlist_tags: + download_item.playlist_file_path = self.base.get_playlist_file_path( + media.playlist_tags, + ) + + download_item.cover_path = self.get_cover_path( + download_item.final_path, + media.cover.file_extension, + ) + + return download_item + + async def download( + self, + download_item: DownloadItem, + ) -> None: + encrypted_path_video = self.base.get_temp_path( + download_item.media.media_metadata["id"], + download_item.uuid_, + "encrypted_video", + ".mp4", + ) + encrypted_path_audio = self.base.get_temp_path( + download_item.media.media_metadata["id"], + download_item.uuid_, + "encrypted_audio", + ".m4a", + ) + + await self.base.download_stream( + download_item.media.stream_info.video_track.stream_url, + encrypted_path_video, + ) + await self.base.download_stream( + download_item.media.stream_info.audio_track.stream_url, + encrypted_path_audio, + ) + + decrypted_path_video = self.base.get_temp_path( + download_item.media.media_metadata["id"], + download_item.uuid_, + "decrypted_video", + ".mp4", + ) + decrypted_path_audio = self.base.get_temp_path( + download_item.media.media_metadata["id"], + download_item.uuid_, + "decrypted_audio", + ".m4a", + ) + + await self.stage( + encrypted_path_video, + encrypted_path_audio, + decrypted_path_video, + decrypted_path_audio, + download_item.staged_path, + download_item.media.decryption_key, + ) + + cover_bytes = ( + await self.base.interface.base.get_cover_bytes( + download_item.media.cover.url + ) + if self.base.interface.base.cover_format != CoverFormat.RAW + else None + ) + await self.base.apply_tags( + download_item.staged_path, + download_item.media.tags, + cover_bytes, + ) diff --git a/gamdl/downloader/song.py b/gamdl/downloader/song.py new file mode 100644 index 0000000..39629e3 --- /dev/null +++ b/gamdl/downloader/song.py @@ -0,0 +1,180 @@ +from pathlib import Path + +import structlog + +from ..interface.enums import CoverFormat +from ..interface.types import AppleMusicMedia, DecryptionKeyAv +from .amdecrypt import decrypt_file, decrypt_file_hex +from .base import AppleMusicBaseDownloader +from .types import DownloadItem + +logger = structlog.get_logger(__name__) + + +class AppleMusicSongDownloader: + def __init__( + self, + base: AppleMusicBaseDownloader, + ): + self.base = base + + async def get_download_item(self, media: AppleMusicMedia) -> DownloadItem: + download_item = DownloadItem(media) + + download_item.staged_path = self.base.get_temp_path( + media.media_metadata["id"], + download_item.uuid_, + "staged", + "." + media.stream_info.file_format.value, + ) + + download_item.final_path = self.base.get_final_path( + media.tags, + ".m4a", + media.playlist_tags, + ) + + if media.playlist_tags: + download_item.playlist_file_path = self.base.get_playlist_file_path( + media.playlist_tags, + ) + + download_item.synced_lyrics_path = self.get_synced_lyrics_path( + download_item.final_path + ) + + download_item.cover_path = self.get_cover_path( + download_item.final_path, + media.cover.file_extension, + ) + + return download_item + + async def _decrypt_amdecrypt( + self, + input_path: str, + output_path: str, + media_id: str, + fairplay_key: str, + ) -> None: + await decrypt_file( + self.base.wrapper_decrypt_ip, + media_id, + fairplay_key, + input_path, + output_path, + ) + + async def _decrypt_amdecrypt_hex( + self, + input_path: str, + output_path: str, + decryption_key: str, + legacy: bool = False, + ) -> None: + await decrypt_file_hex( + input_path, + output_path, + decryption_key, + legacy=legacy, + ) + + async def stage( + self, + encrypted_path: str, + staged_path: str, + decryption_key: DecryptionKeyAv, + legacy: bool, + media_id: str, + fairplay_key: str, + ): + log = logger.bind( + action="stage_song", + media_id=media_id, + encrypted_path=encrypted_path, + staged_path=staged_path, + ) + + if self.base.use_wrapper and not legacy: + await self._decrypt_amdecrypt( + encrypted_path, + staged_path, + media_id, + fairplay_key, + ) + else: + await self._decrypt_amdecrypt_hex( + encrypted_path, + staged_path, + decryption_key.audio_track.key, + legacy, + ) + + log.debug("success") + + def get_synced_lyrics_path(self, final_path: str) -> str: + log = logger.bind(action="get_synced_lyrics_path", final_path=final_path) + + synced_lyrics_path = str( + Path(final_path).with_suffix( + "." + self.base.interface.song.synced_lyrics_format.value + ) + ) + + log.debug("success", synced_lyrics_path=synced_lyrics_path) + + return synced_lyrics_path + + def get_cover_path( + self, + final_path: str, + file_extension: str, + ) -> str: + log = logger.bind( + action="get_song_cover_path", + final_path=final_path, + file_extension=file_extension, + ) + + cover_path = str(Path(final_path).parent / ("Cover" + file_extension)) + + log.debug("success", cover_path=cover_path) + + return cover_path + + async def download( + self, + download_item: DownloadItem, + ) -> None: + encrypted_path = self.base.get_temp_path( + download_item.media.media_metadata["id"], + download_item.uuid_, + "encrypted", + ".m4a", + ) + await self.base.download_stream( + download_item.media.stream_info.audio_track.stream_url, + encrypted_path, + ) + + await self.stage( + encrypted_path, + download_item.staged_path, + download_item.media.decryption_key, + download_item.media.stream_info.audio_track.legacy, + download_item.media.media_metadata["id"], + download_item.media.stream_info.audio_track.fairplay_key, + ) + + cover_bytes = ( + await self.base.interface.base.get_cover_bytes( + download_item.media.cover.url + ) + if self.base.interface.base.cover_format != CoverFormat.RAW + else None + ) + await self.base.apply_tags( + download_item.staged_path, + download_item.media.tags, + cover_bytes, + ) diff --git a/gamdl/downloader/types.py b/gamdl/downloader/types.py index 5990d10..4b32f3d 100644 --- a/gamdl/downloader/types.py +++ b/gamdl/downloader/types.py @@ -1,44 +1,15 @@ +import uuid from dataclasses import dataclass -from typing import Any -from ..interface.types import ( - DecryptionKeyAv, - Lyrics, - MediaTags, - PlaylistTags, - StreamInfoAv, -) +from ..interface.types import AppleMusicMedia @dataclass class DownloadItem: - media_metadata: dict = None - playlist_metadata: dict = None - random_uuid: str = None - lyrics: Lyrics = None - media_tags: MediaTags = None - extra_tags: dict = None - playlist_tags: PlaylistTags = None - stream_info: StreamInfoAv = None - decryption_key: DecryptionKeyAv = None - cover_url_template: str = None - cover_url: str = None + media: AppleMusicMedia + uuid_: str = uuid.uuid4().hex[:8] staged_path: str = None final_path: str = None playlist_file_path: str = None synced_lyrics_path: str = None cover_path: str = None - flat_filter_result: Any = None - error: Exception = None - - -@dataclass -class UrlInfo: - storefront: str = None - type: str = None - slug: str = None - id: str = None - sub_id: str = None - library_storefront: str = None - library_type: str = None - library_id: str = None diff --git a/gamdl/downloader/uploaded_video.py b/gamdl/downloader/uploaded_video.py new file mode 100644 index 0000000..7e02900 --- /dev/null +++ b/gamdl/downloader/uploaded_video.py @@ -0,0 +1,65 @@ +from pathlib import Path + +from ..interface.enums import CoverFormat +from ..interface.types import AppleMusicMedia +from .base import AppleMusicBaseDownloader +from .types import DownloadItem + + +class AppleMusicUploadedVideoDownloader: + def __init__( + self, + base: AppleMusicBaseDownloader, + ): + self.base = base + + def get_cover_path(self, final_path: str, file_extension: str) -> str: + return str(Path(final_path).with_suffix(file_extension)) + + async def get_download_item( + self, + media: AppleMusicMedia, + ) -> DownloadItem: + download_item = DownloadItem(media) + + download_item.staged_path = self.base.get_temp_path( + media.media_metadata["id"], + download_item.uuid_, + "staged", + "." + media.stream_info.file_format.value, + ) + + download_item.final_path = self.base.get_final_path( + media.tags, + "." + media.stream_info.file_format.value, + media.playlist_tags, + ) + + download_item.cover_path = self.get_cover_path( + download_item.final_path, + media.cover.file_extension, + ) + + return download_item + + async def download( + self, + download_item: DownloadItem, + ) -> None: + await self.base._download_ytdlp_async( + download_item.media.stream_info.video_track.stream_url, + download_item.staged_path, + ) + + cover_bytes = ( + await self.base.interface.base.get_cover_bytes( + download_item.media.cover.url + ) + if self.base.interface.base.cover_format != CoverFormat.RAW + else None + ) + await self.base.apply_tags( + download_item.staged_path, + download_item.media.tags, + cover_bytes, + )