Refactor downloader module

This commit is contained in:
Rafael Moraes
2026-04-20 11:56:32 -03:00
parent 939e9459ef
commit f9d62ee84b
14 changed files with 764 additions and 1455 deletions
+5 -4
View File
@@ -1,8 +1,9 @@
from .amdecrypt import decrypt_file, decrypt_file_hex
from .base import AppleMusicBaseDownloader
from .downloader import AppleMusicDownloader
from .downloader_base import AppleMusicBaseDownloader
from .downloader_music_video import AppleMusicMusicVideoDownloader
from .downloader_song import AppleMusicSongDownloader
from .downloader_uploaded_video import AppleMusicUploadedVideoDownloader
from .enums import *
from .exceptions import *
from .music_video import AppleMusicMusicVideoDownloader
from .song import AppleMusicSongDownloader
from .types import *
from .uploaded_video import AppleMusicUploadedVideoDownloader
@@ -1,30 +1,28 @@
import asyncio
import re
import shutil
import uuid
from pathlib import Path
import structlog
from mutagen.mp4 import MP4, MP4Cover
from pywidevine import Cdm, Device
from yt_dlp import YoutubeDL
from ..interface.enums import CoverFormat
from ..interface.interface import AppleMusicInterface
from ..interface.types import MediaTags, PlaylistTags
from ..utils import CustomStringFormatter, async_subprocess
from .constants import ILLEGAL_CHAR_REPLACEMENT, ILLEGAL_CHARS_RE, TEMP_PATH_TEMPLATE
from .enums import DownloadMode
from .hardcoded_wvd import HARDCODED_WVD
logger = structlog.get_logger(__name__)
class AppleMusicBaseDownloader:
def __init__(
self,
interface: AppleMusicInterface,
output_path: str = "./Apple Music",
temp_path: str = ".",
wvd_path: str = None,
overwrite: bool = False,
save_cover: bool = False,
save_playlist: bool = False,
nm3u8dlre_path: str = "N_m3u8DL-RE",
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
@@ -32,26 +30,22 @@ class AppleMusicBaseDownloader:
use_wrapper: bool = False,
wrapper_decrypt_ip: str = "127.0.0.1:10020",
download_mode: DownloadMode = DownloadMode.YTDLP,
cover_format: CoverFormat = CoverFormat.JPG,
album_folder_template: str = "{album_artist}/{album}",
compilation_folder_template: str = "Compilations/{album}",
no_album_folder_template: str = "{artist}/Unknown Album",
playlist_folder_template: str = "Playlists/{playlist_artist}",
single_disc_file_template: str = "{track:02d} {title}",
multi_disc_file_template: str = "{disc}-{track:02d} {title}",
no_album_file_template: str = "{title}",
playlist_file_template: str = "Playlists/{playlist_artist}/{playlist_title}",
playlist_file_template: str = "{playlist_title}",
date_tag_template: str = "%Y-%m-%dT%H:%M:%SZ",
exclude_tags: list[str] = None,
cover_size: int = 1200,
truncate: int = None,
silent: bool = False,
):
self.interface = interface
self.output_path = output_path
self.temp_path = temp_path
self.wvd_path = wvd_path
self.overwrite = overwrite
self.save_cover = save_cover
self.save_playlist = save_playlist
self.nm3u8dlre_path = nm3u8dlre_path
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
@@ -59,64 +53,35 @@ class AppleMusicBaseDownloader:
self.use_wrapper = use_wrapper
self.wrapper_decrypt_ip = wrapper_decrypt_ip
self.download_mode = download_mode
self.cover_format = cover_format
self.album_folder_template = album_folder_template
self.compilation_folder_template = compilation_folder_template
self.no_album_folder_template = no_album_folder_template
self.single_disc_file_template = single_disc_file_template
self.multi_disc_file_template = multi_disc_file_template
self.playlist_folder_template = playlist_folder_template
self.no_album_file_template = no_album_file_template
self.playlist_file_template = playlist_file_template
self.date_tag_template = date_tag_template
self.exclude_tags = exclude_tags
self.cover_size = cover_size
self.truncate = truncate
self.silent = silent
self.initialize()
def initialize(self):
self._initialize_binary_paths()
self._initialize_cdm()
def _initialize_binary_paths(self):
log = logger.bind(action="initialize_binary_paths")
self.full_nm3u8dlre_path = shutil.which(self.nm3u8dlre_path)
self.full_mp4decrypt_path = shutil.which(self.mp4decrypt_path)
self.full_ffmpeg_path = shutil.which(self.ffmpeg_path)
self.full_mp4box_path = shutil.which(self.mp4box_path)
def _initialize_cdm(self):
if self.wvd_path:
self.cdm = Cdm.from_device(Device.load(self.wvd_path))
else:
self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD))
self.cdm.MAX_NUM_OF_SESSIONS = float("inf")
def get_random_uuid(self) -> str:
return uuid.uuid4().hex[:8]
def is_media_streamable(
self,
media_metadata: dict,
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
def get_playlist_tags(
self,
playlist_metadata: dict,
media_metadata: dict,
) -> PlaylistTags:
playlist_track = (
playlist_metadata["relationships"]["tracks"]["data"].index(media_metadata)
+ 1
)
return PlaylistTags(
playlist_artist=playlist_metadata["attributes"].get(
"curatorName", "Unknown"
),
playlist_id=playlist_metadata["attributes"]["playParams"]["id"],
playlist_title=playlist_metadata["attributes"]["name"],
playlist_track=playlist_track,
log = log.debug(
"success",
full_nm3u8dlre_path=self.full_nm3u8dlre_path,
full_mp4decrypt_path=self.full_mp4decrypt_path,
full_ffmpeg_path=self.full_ffmpeg_path,
full_mp4box_path=self.full_mp4box_path,
)
def get_temp_path(
@@ -126,13 +91,19 @@ class AppleMusicBaseDownloader:
file_tag: str,
file_extension: str,
) -> str:
return str(
log = logger.bind(action="get_temp_path")
temp_path = str(
Path(self.temp_path)
/ TEMP_PATH_TEMPLATE.format(folder_tag)
/ (f"{media_id}_{file_tag}" + file_extension)
)
def sanitize_string(
log.debug("success", temp_path=temp_path)
return temp_path
def _sanitize_string(
self,
dirty_string: str,
file_ext: str = None,
@@ -160,6 +131,8 @@ class AppleMusicBaseDownloader:
file_extension: str,
playlist_tags: PlaylistTags | None,
) -> str:
log = logger.bind(action="get_final_path")
if tags.album:
template_folder_parts = (
self.compilation_folder_template.split("/")
@@ -217,29 +190,39 @@ class AppleMusicBaseDownloader:
track=(tags.track, ""),
track_total=(tags.track_total, ""),
)
sanitized_formatted_part = self.sanitize_string(
sanitized_formatted_part = self._sanitize_string(
formatted_part,
file_extension if not is_folder else None,
)
formatted_parts.append(sanitized_formatted_part)
return str(Path(self.output_path, *formatted_parts))
final_path = str(Path(self.output_path, *formatted_parts))
log.debug("success", final_path=final_path)
return final_path
async def download_stream(self, stream_url: str, download_path: str):
log = logger.bind(
action="download_stream", stream_url=stream_url, download_path=download_path
)
if self.download_mode == DownloadMode.YTDLP:
await self.download_ytdlp(stream_url, download_path)
await self._download_ytdlp_async(stream_url, download_path)
if self.download_mode == DownloadMode.NM3U8DLRE:
await self.download_nm3u8dlre(stream_url, download_path)
await self._download_nm3u8dlre(stream_url, download_path)
async def download_ytdlp(self, stream_url: str, download_path: str) -> None:
log.debug("success")
async def _download_ytdlp_async(self, stream_url: str, download_path: str) -> None:
await asyncio.to_thread(
self._download_ytdlp,
self._download_ytdlp_sync,
stream_url,
download_path,
)
def _download_ytdlp(self, stream_url: str, download_path: str) -> None:
def _download_ytdlp_sync(self, stream_url: str, download_path: str) -> None:
with YoutubeDL(
{
"quiet": True,
@@ -254,7 +237,7 @@ class AppleMusicBaseDownloader:
) as ydl:
ydl.download(stream_url)
async def download_nm3u8dlre(self, stream_url: str, download_path: str):
async def _download_nm3u8dlre(self, stream_url: str, download_path: str):
download_path_obj = Path(download_path)
download_path_obj.parent.mkdir(parents=True, exist_ok=True)
@@ -278,11 +261,12 @@ class AppleMusicBaseDownloader:
async def apply_tags(
self,
media_path: Path,
media_path: str,
tags: MediaTags,
cover_bytes: bytes | None,
extra_tags: dict | None = None,
):
log = logger.bind(action="apply_tags", media_path=media_path)
exclude_tags = self.exclude_tags or []
filtered_tags = MediaTags(
@@ -297,21 +281,21 @@ class AppleMusicBaseDownloader:
skip_tagging = "all" in exclude_tags
await asyncio.to_thread(
self.apply_mp4_tags,
self._apply_mp4_tags,
media_path,
mp4_tags,
cover_bytes,
skip_tagging,
extra_tags,
)
def apply_mp4_tags(
log.debug("success")
def _apply_mp4_tags(
self,
media_path: Path,
media_path: str,
tags: dict,
cover_bytes: bytes | None,
skip_tagging: bool,
extra_tags: dict | None,
):
mp4 = MP4(media_path)
mp4.clear()
@@ -323,14 +307,12 @@ class AppleMusicBaseDownloader:
data=cover_bytes,
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
if self.interface.base.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
mp4.update(tags)
if extra_tags:
mp4.update(extra_tags)
mp4.save()
@@ -347,82 +329,41 @@ class AppleMusicBaseDownloader:
data=cover_bytes,
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
if self.interface.base.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
def move_to_final_path(self, stage_path: str, final_path: str) -> None:
Path(final_path).parent.mkdir(parents=True, exist_ok=True)
shutil.move(stage_path, final_path)
def write_cover_image(
self,
cover_bytes: bytes,
cover_path: str,
) -> None:
Path(cover_path).parent.mkdir(parents=True, exist_ok=True)
Path(cover_path).write_bytes(cover_bytes)
def get_playlist_file_path(
self,
tags: PlaylistTags,
) -> str:
log = logger.bind(action="get_playlist_file_path")
template_folder_parts = self.playlist_folder_template.split("/")
template_file_parts = self.playlist_file_template.split("/")
template_parts = template_folder_parts + template_file_parts
formatted_parts = []
for i, part in enumerate(template_file_parts):
is_folder = i < len(template_file_parts) - 1
for i, part in enumerate(template_parts):
is_folder = i < len(template_parts) - 1
formatted_part = CustomStringFormatter().format(
part,
playlist_artist=(tags.playlist_artist, "Unknown Playlist Artist"),
playlist_artist=(tags.artist, "Unknown Playlist Artist"),
playlist_id=(tags.playlist_id, "Unknown Playlist ID"),
playlist_title=(tags.playlist_title, "Unknown Playlist Title"),
playlist_track=(tags.playlist_track, ""),
playlist_title=(tags.title, "Unknown Playlist Title"),
playlist_track=(tags.track, ""),
)
file_ext = None if is_folder else ".m3u8"
sanitized_formatted_part = self.sanitize_string(
file_ext = None if is_folder else ".m3u"
sanitized_formatted_part = self._sanitize_string(
formatted_part,
file_ext,
)
formatted_parts.append(sanitized_formatted_part)
return str(Path(self.output_path, *formatted_parts))
final_path = str(Path(self.output_path, *formatted_parts))
def update_playlist_file(
self,
playlist_file_path: str,
final_path: str,
playlist_track: int,
) -> None:
playlist_file_path_obj = Path(playlist_file_path)
final_path_obj = Path(final_path)
output_dir_obj = Path(self.output_path)
log.debug("success", playlist_file_path=final_path)
playlist_file_path_obj.parent.mkdir(parents=True, exist_ok=True)
playlist_file_path_parent_parts_len = len(playlist_file_path_obj.parent.parts)
output_path_parts_len = len(output_dir_obj.parts)
final_path_relative = Path(
("../" * (playlist_file_path_parent_parts_len - output_path_parts_len)),
*final_path_obj.parts[output_path_parts_len:],
)
playlist_file_lines = (
playlist_file_path_obj.open("r", encoding="utf8").readlines()
if playlist_file_path_obj.exists()
else []
)
if len(playlist_file_lines) < playlist_track:
playlist_file_lines.extend(
"\n" for _ in range(playlist_track - len(playlist_file_lines))
)
playlist_file_lines[playlist_track - 1] = final_path_relative.as_posix() + "\n"
with playlist_file_path_obj.open("w", encoding="utf8") as playlist_file:
playlist_file.writelines(playlist_file_lines)
def cleanup_temp(self, random_uuid: str) -> None:
temp_folder = Path(self.temp_path) / TEMP_PATH_TEMPLATE.format(random_uuid)
if temp_folder.exists():
shutil.rmtree(temp_folder)
return final_path
-43
View File
@@ -1,46 +1,3 @@
import re
TEMP_PATH_TEMPLATE = "gamdl_temp_{}"
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
SONG_MEDIA_TYPE = {"song", "songs", "library-songs"}
ALBUM_MEDIA_TYPE = {"album", "albums", "library-albums"}
MUSIC_VIDEO_MEDIA_TYPE = {"music-video", "music-videos", "library-music-videos"}
ARTIST_MEDIA_TYPE = {"artist", "artists", "library-artists"}
UPLOADED_VIDEO_MEDIA_TYPE = {"post", "uploaded-videos"}
PLAYLIST_MEDIA_TYPE = {"playlist", "playlists", "library-playlists"}
ARTIST_AUTO_SELECT_KEY_MAP = {
"main-albums": ("views", "full-albums"),
"compilation-albums": ("views", "compilation-albums"),
"live-albums": ("views", "live-albums"),
"singles-eps": ("views", "singles"),
"all-albums": ("relationships", "albums"),
"top-songs": ("views", "top-songs"),
"music-videos": ("relationships", "music-videos"),
}
ARTIST_AUTO_SELECT_STR_MAP = {
"main-albums": "Main Albums",
"compilation-albums": "Compilation Albums",
"live-albums": "Live Albums",
"singles-eps": "Singles & EPs",
"all-albums": "All Albums",
"top-songs": "Top Songs",
"music-videos": "Music Videos",
}
VALID_URL_PATTERN = re.compile(
r"https://(?:classical\.)?music\.apple\.com"
r"(?:"
r"/(?P<storefront>[a-z]{2})"
r"/(?P<type>artist|album|playlist|song|music-video|post)"
r"(?:/(?P<slug>[^\s/]+))?"
r"/(?P<id>[0-9]+|pl\.[0-9a-z]{32}|pl\.u-[a-zA-Z0-9]+)"
r"(?:\?i=(?P<sub_id>[0-9]+))?"
r"|"
r"(?:/(?P<library_storefront>[a-z]{2}))?"
r"/library/(?P<library_type>playlist|albums)"
r"/(?P<library_id>p\.[a-zA-Z0-9]+|l\.[a-zA-Z0-9]+)"
r")"
)
+207 -560
View File
@@ -1,617 +1,264 @@
import asyncio
import typing
from pathlib import Path
from typing import AsyncGenerator
import structlog
import shutil
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from ..api.exceptions import ApiError
from ..interface import AppleMusicInterface
from ..utils import safe_gather
from .constants import (
ALBUM_MEDIA_TYPE,
ARTIST_MEDIA_TYPE,
MUSIC_VIDEO_MEDIA_TYPE,
PLAYLIST_MEDIA_TYPE,
SONG_MEDIA_TYPE,
UPLOADED_VIDEO_MEDIA_TYPE,
VALID_URL_PATTERN,
)
from .downloader_base import AppleMusicBaseDownloader
from .downloader_music_video import AppleMusicMusicVideoDownloader
from .downloader_song import AppleMusicSongDownloader
from .downloader_uploaded_video import AppleMusicUploadedVideoDownloader
from .base import AppleMusicBaseDownloader
from .enums import ArtistAutoSelect, DownloadMode, RemuxMode
from .exceptions import (
ExecutableNotFound,
FormatNotAvailable,
MediaFileExists,
NotStreamable,
SyncedLyricsOnly,
UnsupportedMediaType,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
)
from .types import DownloadItem, UrlInfo
from .music_video import AppleMusicMusicVideoDownloader
from .song import AppleMusicSongDownloader
from .types import DownloadItem
from .constants import TEMP_PATH_TEMPLATE
from .uploaded_video import AppleMusicUploadedVideoDownloader
logger = structlog.get_logger(__name__)
class AppleMusicDownloader:
def __init__(
self,
interface: AppleMusicInterface,
base_downloader: AppleMusicBaseDownloader,
song_downloader: AppleMusicSongDownloader,
music_video_downloader: AppleMusicMusicVideoDownloader,
uploaded_video_downloader: AppleMusicUploadedVideoDownloader,
artist_auto_select: ArtistAutoSelect | None = None,
skip_music_videos: bool = False,
song: AppleMusicSongDownloader,
music_video: AppleMusicMusicVideoDownloader,
uploaded_video: AppleMusicUploadedVideoDownloader,
overwrite: bool = False,
save_cover: bool = False,
save_playlist: bool = False,
no_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
skip_cleanup: bool = False,
skip_processing: bool = False,
flat_filter: typing.Callable = None,
):
self.interface = interface
self.base_downloader = base_downloader
self.song_downloader = song_downloader
self.music_video_downloader = music_video_downloader
self.uploaded_video_downloader = uploaded_video_downloader
self.artist_auto_select = artist_auto_select
self.skip_music_videos = skip_music_videos
self.song = song
self.music_video = music_video
self.uploaded_video = uploaded_video
self.overwrite = overwrite
self.save_cover = save_cover
self.save_playlist = save_playlist
self.no_synced_lyrics = no_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
self.skip_cleanup = skip_cleanup
self.skip_processing = skip_processing
self.flat_filter = flat_filter
async def get_single_download_item(
self.base = song.base
async def get_download_item_from_url(
self,
media_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
if self.flat_filter:
flat_filter_result = self.flat_filter(media_metadata)
if asyncio.iscoroutine(flat_filter_result):
flat_filter_result = await flat_filter_result
url: str,
) -> AsyncGenerator[DownloadItem, None]:
async for media in self.base.interface.get_media_from_url(url):
if media.error or media.flat_filter_result:
yield DownloadItem(media)
if flat_filter_result:
return DownloadItem(
media_metadata=media_metadata,
playlist_metadata=playlist_metadata,
flat_filter_result=flat_filter_result,
)
elif media.media_metadata["type"] in {"songs", "library-songs"}:
yield await self.song.get_download_item(media)
return await self.get_single_download_item_no_filter(
media_metadata,
playlist_metadata,
)
elif media.media_metadata["type"] in {
"music-videos",
"library-music-videos",
}:
yield await self.music_video.get_download_item(media)
async def get_single_download_item_no_filter(
self,
media_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
elif media.media_metadata["type"] in {"uploaded-videos"}:
yield await self.uploaded_video.get_download_item(media)
async def download(self, item: DownloadItem) -> None:
try:
if not self.base_downloader.is_media_streamable(
media_metadata,
):
raise NotStreamable(media_metadata["id"])
if item.media.error:
raise item.media.error
if media_metadata["type"] in SONG_MEDIA_TYPE:
if not self.song_downloader:
raise UnsupportedMediaType(media_metadata["type"])
download_item = await self.song_downloader.get_download_item(
media_metadata,
playlist_metadata,
)
if media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
if not self.music_video_downloader:
raise UnsupportedMediaType(media_metadata["type"])
download_item = await self.music_video_downloader.get_download_item(
media_metadata,
playlist_metadata,
)
if media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE:
if not self.uploaded_video_downloader:
raise UnsupportedMediaType(media_metadata["type"])
download_item = await self.uploaded_video_downloader.get_download_item(
media_metadata,
)
except Exception as e:
download_item = DownloadItem(
media_metadata=media_metadata,
playlist_metadata=playlist_metadata,
error=e,
)
return download_item
async def get_collection_download_items(
self,
collection_metadata: dict,
) -> list[DownloadItem]:
tracks_metadata = collection_metadata["relationships"]["tracks"]["data"]
async for extended_data in self.interface.apple_music_api.extend_api_data(
collection_metadata["relationships"]["tracks"],
):
tracks_metadata.extend(extended_data["data"])
tasks = [
self.get_single_download_item(
media_metadata,
(
collection_metadata
if collection_metadata["type"] in PLAYLIST_MEDIA_TYPE
else None
),
)
for media_metadata in tracks_metadata
]
download_items = await safe_gather(*tasks)
return download_items
async def get_artist_download_items(
self,
artist_metadata: dict,
) -> list[DownloadItem]:
if not self.artist_auto_select:
available_choices = []
for artist_auto_select_option in list(ArtistAutoSelect):
relation_key, type_key = artist_auto_select_option.path_key
available_choices.append(
Choice(
name=str(artist_auto_select_option),
value=(artist_auto_select_option,),
),
)
(artist_auto_select,) = await inquirer.select(
message=f'Select which type to download for artist "{artist_metadata["attributes"]["name"]}":',
choices=available_choices,
validate=lambda result: artist_metadata.get(result[0].path_key[0], {})
.get(result[0].path_key[1], {})
.get("data"),
).execute_async()
else:
artist_auto_select = self.artist_auto_select
relation_key, type_key = artist_auto_select.path_key
async for extended_data in self.interface.apple_music_api.extend_api_data(
artist_metadata[relation_key][type_key],
):
artist_metadata[relation_key][type_key]["data"].extend(extended_data["data"])
selected_items = artist_metadata[relation_key][type_key]["data"]
select_all = self.artist_auto_select is not None
if artist_auto_select in {
ArtistAutoSelect.MAIN_ALBUMS,
ArtistAutoSelect.COMPILATION_ALBUMS,
ArtistAutoSelect.LIVE_ALBUMS,
ArtistAutoSelect.SINGLES_EPS,
ArtistAutoSelect.ALL_ALBUMS,
}:
return await self.get_artist_albums_download_items(
selected_items,
select_all,
)
elif artist_auto_select == ArtistAutoSelect.TOP_SONGS:
return await self.get_artist_songs_download_items(
selected_items,
select_all,
)
elif artist_auto_select == ArtistAutoSelect.MUSIC_VIDEOS:
return await self.get_artist_music_videos_download_items(
selected_items,
select_all,
)
async def get_artist_albums_download_items(
self,
albums_metadata: list[dict],
select_all: bool = False,
) -> list[DownloadItem]:
if not select_all:
choices = [
Choice(
name=" | ".join(
[
f'{album["attributes"]["trackCount"]:03d}',
f'{album["attributes"]["releaseDate"]:<10}',
f'{album["attributes"].get("contentRating", "None").title():<8}',
f'{album["attributes"]["name"]}',
]
),
value=album,
)
for album in albums_metadata
if album.get("attributes")
]
selected = await inquirer.select(
message="Select which albums to download: (Track Count | Release Date | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
else:
selected = albums_metadata
download_items = []
album_tasks = [
self.interface.apple_music_api.get_album(album_metadata["id"])
for album_metadata in selected
]
album_responses = await safe_gather(*album_tasks)
track_tasks = [
self.get_collection_download_items(album_response["data"][0])
for album_response in album_responses
]
track_results = await safe_gather(*track_tasks)
for track_result in track_results:
download_items.extend(track_result)
return download_items
async def get_artist_music_videos_download_items(
self,
music_videos_metadata: list[dict],
select_all: bool = False,
) -> list[DownloadItem]:
if not select_all:
choices = [
Choice(
name=" | ".join(
[
self.millis_to_min_sec(
music_video["attributes"]["durationInMillis"]
),
f'{music_video["attributes"].get("contentRating", "None").title():<8}',
music_video["attributes"]["name"],
],
),
value=music_video,
)
for music_video in music_videos_metadata
if music_video.get("attributes")
]
selected = await inquirer.select(
message="Select which music videos to download: (Duration | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
else:
selected = music_videos_metadata
music_video_tasks = [
self.get_single_download_item(
music_video_metadata,
)
for music_video_metadata in selected
]
download_items = await safe_gather(*music_video_tasks)
return download_items
async def get_artist_songs_download_items(
self,
songs_metadata: list[dict],
select_all: bool = False,
) -> list[DownloadItem]:
if not select_all:
choices = [
Choice(
name=" | ".join(
[
self.millis_to_min_sec(
song["attributes"]["durationInMillis"]
),
f'{song["attributes"].get("contentRating", "None").title():<8}',
song["attributes"]["name"],
],
),
value=song,
)
for song in songs_metadata
if song.get("attributes")
]
selected = await inquirer.select(
message="Select which songs to download: (Duration | Rating | Title)",
choices=choices,
multiselect=True,
).execute_async()
else:
selected = songs_metadata
song_tasks = [
self.get_single_download_item(
song_metadata,
)
for song_metadata in selected
]
download_items = await safe_gather(*song_tasks)
return download_items
def millis_to_min_sec(self, millis) -> str:
minutes, seconds = divmod(millis // 1000, 60)
return f"{minutes:02}:{seconds:02}"
def get_url_info(self, url: str) -> UrlInfo | None:
match = VALID_URL_PATTERN.match(url)
if not match:
return None
return UrlInfo(
**match.groupdict(),
)
async def get_download_queue(
self,
url_info: UrlInfo,
) -> list[DownloadItem] | None:
return await self._get_download_queue(
"song" if url_info.sub_id else url_info.type or url_info.library_type,
url_info.sub_id or url_info.id or url_info.library_id,
url_info.library_id is not None,
)
async def _get_download_queue(
self,
url_type: str,
id: str,
is_library: bool,
) -> list[DownloadItem] | None:
download_items = []
if url_type in ARTIST_MEDIA_TYPE:
try:
artist_response = await self.interface.apple_music_api.get_artist(
id,
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if artist_response is None:
return None
download_items = await self.get_artist_download_items(
artist_response["data"][0],
)
if url_type in SONG_MEDIA_TYPE:
try:
song_respose = await self.interface.apple_music_api.get_song(id)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if song_respose is None:
return None
download_items.append(
await self.get_single_download_item(song_respose["data"][0])
)
if url_type in ALBUM_MEDIA_TYPE:
try:
if is_library:
album_response = (
await self.interface.apple_music_api.get_library_album(id)
)
else:
album_response = await self.interface.apple_music_api.get_album(id)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if album_response is None:
return None
download_items = await self.get_collection_download_items(
album_response["data"][0],
)
if url_type in PLAYLIST_MEDIA_TYPE:
try:
if is_library:
playlist_response = (
await self.interface.apple_music_api.get_library_playlist(id)
)
else:
playlist_response = (
await self.interface.apple_music_api.get_playlist(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if playlist_response is None:
return None
download_items = await self.get_collection_download_items(
playlist_response["data"][0],
)
if url_type in MUSIC_VIDEO_MEDIA_TYPE:
try:
music_video_response = (
await self.interface.apple_music_api.get_music_video(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if music_video_response is None:
return None
download_items.append(
await self.get_single_download_item(music_video_response["data"][0])
)
if url_type in UPLOADED_VIDEO_MEDIA_TYPE:
try:
uploaded_video = (
await self.interface.apple_music_api.get_uploaded_video(id)
)
except ApiError as e:
if e.status_code == 404:
return None
raise e
if uploaded_video is None:
return None
download_items.append(
await self.get_single_download_item(uploaded_video["data"][0])
)
return download_items
async def download(
self,
download_item: DownloadItem,
) -> DownloadItem:
try:
if download_item.flat_filter_result:
download_item = await self.get_single_download_item_no_filter(
download_item.media_metadata,
download_item.playlist_metadata,
)
if download_item.error:
raise download_item.error
await self._initial_processing(download_item)
await self._download(download_item)
await self._final_processing(download_item)
return download_item
await self._initial_processing(item)
await self._download(item)
await self._final_processing(item)
finally:
if isinstance(download_item, DownloadItem) and not self.skip_processing:
self.base_downloader.cleanup_temp(download_item.random_uuid)
self._cleanup_temp(item.uuid_)
async def _download(
def _update_playlist_file(
self,
download_item: DownloadItem,
playlist_file_path: str,
final_path: str,
playlist_track: int,
) -> None:
if (
self.song_downloader.synced_lyrics_only
and download_item.media_metadata["type"] not in SONG_MEDIA_TYPE
):
raise SyncedLyricsOnly()
log = logger.bind(
action="update_playlist_file",
playlist_file_path=playlist_file_path,
final_path=final_path,
playlist_track=playlist_track,
)
if self.song_downloader.synced_lyrics_only:
return
playlist_file_path_obj = Path(playlist_file_path)
final_path_obj = Path(final_path)
output_dir_obj = Path(self.base.output_path)
if (
Path(download_item.final_path).exists()
and not self.base_downloader.overwrite
):
raise MediaFileExists(download_item.final_path)
playlist_file_path_obj.parent.mkdir(parents=True, exist_ok=True)
playlist_file_path_parent_parts_len = len(playlist_file_path_obj.parent.parts)
output_path_parts_len = len(output_dir_obj.parts)
if (
self.base_downloader.download_mode == DownloadMode.NM3U8DLRE
and not self.base_downloader.full_nm3u8dlre_path
):
raise ExecutableNotFound("N_m3u8DL-RE")
if download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
if (
self.music_video_downloader.remux_mode == RemuxMode.FFMPEG
and not self.base_downloader.full_ffmpeg_path
):
raise ExecutableNotFound("ffmpeg")
if (
self.music_video_downloader.remux_mode == RemuxMode.MP4BOX
and not self.base_downloader.full_mp4box_path
):
raise ExecutableNotFound("MP4Box")
if (
download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE
or self.music_video_downloader.remux_mode == RemuxMode.MP4BOX
) and not self.base_downloader.full_mp4decrypt_path:
raise ExecutableNotFound("mp4decrypt")
if (
not download_item.stream_info
or not download_item.stream_info.audio_track
or not download_item.stream_info.audio_track.stream_url
or (
(
not download_item.decryption_key
or not download_item.decryption_key.audio_track
or not download_item.decryption_key.audio_track.key
)
and not self.base_downloader.use_wrapper
final_path_relative = Path(
("../" * (playlist_file_path_parent_parts_len - output_path_parts_len)),
*final_path_obj.parts[output_path_parts_len:],
)
playlist_file_lines = (
playlist_file_path_obj.open("r", encoding="utf8").readlines()
if playlist_file_path_obj.exists()
else []
)
if len(playlist_file_lines) < playlist_track:
playlist_file_lines.extend(
"\n" for _ in range(playlist_track - len(playlist_file_lines))
)
):
raise FormatNotAvailable(download_item.media_metadata["id"])
if download_item.media_metadata["type"] in SONG_MEDIA_TYPE:
await self.song_downloader.download(download_item)
playlist_file_lines[playlist_track - 1] = final_path_relative.as_posix() + "\n"
with playlist_file_path_obj.open("w", encoding="utf8") as playlist_file:
playlist_file.writelines(playlist_file_lines)
if download_item.media_metadata["type"] in MUSIC_VIDEO_MEDIA_TYPE:
await self.music_video_downloader.download(download_item)
log.debug("success")
if download_item.media_metadata["type"] in UPLOADED_VIDEO_MEDIA_TYPE:
await self.uploaded_video_downloader.download(download_item)
def _write_cover(self, cover_path: str, cover_bytes: bytes) -> None:
log = logger.bind(action="write_cover_file", cover_path=cover_path)
async def _initial_processing(
self,
download_item: DownloadItem,
) -> None:
Path(cover_path).parent.mkdir(parents=True, exist_ok=True)
with open(cover_path, "wb") as f:
f.write(cover_bytes)
log.debug("success")
def _write_synced_lyrics(self, synced_lyrics_path: str, lyrics: str) -> None:
log = logger.bind(
action="write_synced_lyrics",
synced_lyrics_path=synced_lyrics_path,
)
Path(synced_lyrics_path).parent.mkdir(parents=True, exist_ok=True)
with open(synced_lyrics_path, "w", encoding="utf-8") as f:
f.write(lyrics)
log.debug("success")
async def _initial_processing(self, item: DownloadItem) -> None:
if self.skip_processing:
return
if download_item.cover_path and self.base_downloader.save_cover:
cover_bytes = await self.interface.get_cover_bytes(download_item.cover_url)
if cover_bytes and (
self.base_downloader.overwrite
or not Path(download_item.cover_path).exists()
):
self.base_downloader.write_cover_image(
if item.playlist_file_path and item.final_path and self.save_playlist:
self._update_playlist_file(
item.playlist_file_path,
item.final_path,
item.media.playlist_tags.track,
)
if item.cover_path and self.save_cover and item.media.cover.url:
cover_bytes = await self.base.interface.base.get_cover_bytes(
item.media.cover.url,
)
if cover_bytes and (self.overwrite or not Path(item.cover_path).exists()):
self._write_cover(
item.cover_path,
cover_bytes,
download_item.cover_path,
)
if (
download_item.lyrics
and download_item.lyrics.synced
and not self.song_downloader.no_synced_lyrics
and (
self.base_downloader.overwrite
or not Path(download_item.synced_lyrics_path).exists()
)
item.synced_lyrics_path
and not self.no_synced_lyrics
and item.media.lyrics
and item.media.lyrics.synced
and (self.overwrite or not Path(item.synced_lyrics_path).exists())
):
self.song_downloader.write_synced_lyrics(
download_item.lyrics.synced,
download_item.synced_lyrics_path,
self._write_synced_lyrics(
item.synced_lyrics_path,
item.media.lyrics.synced,
)
if download_item.playlist_tags and self.base_downloader.save_playlist:
self.base_downloader.update_playlist_file(
download_item.playlist_file_path,
download_item.final_path,
download_item.playlist_tags.playlist_track,
async def _download(self, item: DownloadItem) -> None:
if item.media.error:
raise item.media.error
if self.synced_lyrics_only:
raise GamdlDownloaderSyncedLyricsOnlyError(
"Download mode is set to synced lyrics only"
)
if Path(item.final_path).exists() and not self.overwrite:
raise GamdlDownloaderMediaFileExistsError(item.final_path)
if item.media.media_metadata["type"] in {
"music-videos",
"library-music-videos",
"songs",
"library-songs",
}:
if (
self.base.download_mode == DownloadMode.NM3U8DLRE
and not self.base.full_nm3u8dlre_path
):
raise GamdlDownloaderDependencyNotFoundError("N_m3u8DL-RE")
if item.media.media_metadata["type"] in {"songs", "library-songs"}:
await self.song.download(item)
elif item.media.media_metadata["type"] in {
"music-videos",
"library-music-videos",
}:
if (
self.music_video.remux_mode == RemuxMode.FFMPEG
and not self.base.full_ffmpeg_path
):
raise GamdlDownloaderDependencyNotFoundError("FFmpeg")
if (
self.music_video.remux_mode == RemuxMode.MP4BOX
and not self.base.full_mp4box_path
and not self.base.full_mp4decrypt_path
):
raise GamdlDownloaderDependencyNotFoundError(
"MP4Box and/or mp4decrypt"
)
await self.music_video.download(item)
elif item.media.media_metadata["type"] in {"uploaded-videos"}:
await self.uploaded_video.download(item)
def _move_to_final_path(self, staged_path: str, final_path: str) -> None:
log = logger.bind(
action="move_to_final_path",
staged_path=staged_path,
final_path=final_path,
)
Path(final_path).parent.mkdir(parents=True, exist_ok=True)
shutil.move(staged_path, final_path)
log.debug("success")
async def _final_processing(
self,
download_item: DownloadItem,
item: DownloadItem,
) -> None:
if self.skip_processing:
return
if download_item.staged_path and Path(download_item.staged_path).exists():
self.base_downloader.move_to_final_path(
download_item.staged_path,
download_item.final_path,
if Path(item.staged_path).exists():
self._move_to_final_path(
item.staged_path,
item.final_path,
)
def _cleanup_temp(self, folder_tag: str) -> None:
log = logger.bind(action="cleanup_temp", folder_tag=folder_tag)
temp_path = Path(self.base.temp_path) / TEMP_PATH_TEMPLATE.format(folder_tag)
if temp_path.exists() and temp_path.is_dir() and not self.skip_cleanup:
shutil.rmtree(temp_path, ignore_errors=True)
log.debug("success")
-285
View File
@@ -1,285 +0,0 @@
from pathlib import Path
from ..interface.enums import CoverFormat, MusicVideoCodec, MusicVideoResolution
from ..interface.interface_music_video import AppleMusicMusicVideoInterface
from ..interface.types import DecryptionKeyAv
from ..utils import async_subprocess
from .downloader_base import AppleMusicBaseDownloader
from .enums import RemuxFormatMusicVideo, RemuxMode
from .types import DownloadItem
class AppleMusicMusicVideoDownloader(AppleMusicBaseDownloader):
def __init__(
self,
base_downloader: AppleMusicBaseDownloader,
interface: AppleMusicMusicVideoInterface,
codec_priority: list[MusicVideoCodec] = [
MusicVideoCodec.H264,
MusicVideoCodec.H265,
],
remux_mode: RemuxMode = RemuxMode.FFMPEG,
remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V,
resolution: MusicVideoResolution = MusicVideoResolution.R1080P,
):
self.__dict__.update(base_downloader.__dict__)
self.interface = interface
self.codec_priority = codec_priority
self.remux_mode = remux_mode
self.remux_format = remux_format
self.resolution = resolution
async def remux_mp4box(
self,
input_path_video: str,
input_path_audio: str,
output_path: str,
):
await async_subprocess(
self.full_mp4box_path,
"-quiet",
"-add",
input_path_audio,
"-add",
input_path_video,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
output_path,
silent=self.silent,
)
async def remux_ffmpeg(
self,
input_path_video: str,
input_path_audio: str,
output_path: str,
decryption_key: str = None,
):
if decryption_key:
key = [
"-decryption_key",
decryption_key,
]
else:
key = []
await async_subprocess(
self.full_ffmpeg_path,
"-loglevel",
"error",
"-y",
*key,
"-i",
input_path_video,
"-i",
input_path_audio,
"-c",
"copy",
"-c:s",
"mov_text",
"-movflags",
"+faststart",
output_path,
silent=self.silent,
)
async def decrypt_mp4decrypt(
self,
input_path: str,
output_path: str,
decryption_key: str,
):
await async_subprocess(
self.full_mp4decrypt_path,
"--key",
f"1:{decryption_key}",
input_path,
output_path,
silent=self.silent,
)
async def stage(
self,
encrypted_path_video: str,
encrypted_path_audio: str,
decrypted_path_video: str,
decrypted_path_audio: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
):
await self.decrypt_mp4decrypt(
encrypted_path_video,
decrypted_path_video,
decryption_key.video_track.key,
)
await self.decrypt_mp4decrypt(
encrypted_path_audio,
decrypted_path_audio,
decryption_key.audio_track.key,
)
if self.remux_mode == RemuxMode.MP4BOX:
await self.remux_mp4box(
decrypted_path_video,
decrypted_path_audio,
staged_path,
)
else:
await self.remux_ffmpeg(
decrypted_path_video,
decrypted_path_audio,
staged_path,
)
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
return str(Path(final_path).with_suffix(file_extension))
async def get_download_item(
self,
music_video_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = DownloadItem()
download_item.media_metadata = music_video_metadata
download_item.playlist_metadata = playlist_metadata
music_video_id = self.interface.get_media_id_of_library_media(
music_video_metadata,
)
itunes_page_metadata = await self.interface.get_itunes_page_metadata(
music_video_metadata,
)
download_item.media_tags = await self.interface.get_tags(
music_video_metadata,
itunes_page_metadata,
)
if playlist_metadata:
download_item.playlist_tags = self.get_playlist_tags(
playlist_metadata,
music_video_metadata,
)
download_item.playlist_file_path = self.get_playlist_file_path(
download_item.playlist_tags,
)
download_item.stream_info = await self.interface.get_stream_info(
music_video_metadata,
itunes_page_metadata,
self.codec_priority,
self.resolution,
)
download_item.decryption_key = await self.interface.get_decryption_key(
download_item.stream_info,
self.cdm,
)
download_item.random_uuid = self.get_random_uuid()
download_item.staged_path = self.get_temp_path(
music_video_id,
download_item.random_uuid,
"staged",
(
"."
+ (
"mp4"
if self.remux_format == RemuxFormatMusicVideo.MP4
else download_item.stream_info.file_format.value
)
),
)
download_item.final_path = self.get_final_path(
download_item.media_tags,
Path(download_item.staged_path).suffix,
playlist_metadata,
)
download_item.cover_url_template = self.interface.get_cover_url_template(
music_video_metadata,
self.cover_format,
)
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
download_item.final_path,
cover_file_extension,
)
return download_item
async def download(
self,
download_item: DownloadItem,
) -> None:
encrypted_path_video = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"encrypted_video",
".mp4",
)
encrypted_path_audio = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"encrypted_audio",
".m4a",
)
await self.download_stream(
download_item.stream_info.video_track.stream_url,
encrypted_path_video,
)
await self.download_stream(
download_item.stream_info.audio_track.stream_url,
encrypted_path_audio,
)
decrypted_path_video = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"decrypted_video",
".mp4",
)
decrypted_path_audio = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"decrypted_audio",
".m4a",
)
await self.stage(
encrypted_path_video,
encrypted_path_audio,
decrypted_path_video,
decrypted_path_audio,
download_item.staged_path,
download_item.decryption_key,
)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
cover_bytes,
)
-248
View File
@@ -1,248 +0,0 @@
from pathlib import Path
from ..interface.enums import CoverFormat, SongCodec, SyncedLyricsFormat
from ..interface.interface_song import AppleMusicSongInterface
from ..interface.types import DecryptionKeyAv
from .amdecrypt import decrypt_file, decrypt_file_hex
from .downloader_base import AppleMusicBaseDownloader
from .types import DownloadItem
class AppleMusicSongDownloader(AppleMusicBaseDownloader):
def __init__(
self,
base_downloader: AppleMusicBaseDownloader,
interface: AppleMusicSongInterface,
codec_priority: SongCodec = [SongCodec.AAC_LEGACY],
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
no_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
use_album_date: bool = False,
fetch_extra_tags: bool = False,
):
self.__dict__.update(base_downloader.__dict__)
self.interface = interface
self.codec_priority = codec_priority
self.synced_lyrics_format = synced_lyrics_format
self.no_synced_lyrics = no_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
self.use_album_date = use_album_date
self.fetch_extra_tags = fetch_extra_tags
async def get_download_item(
self,
song_metadata: dict,
playlist_metadata: dict = None,
) -> DownloadItem:
download_item = DownloadItem()
download_item.media_metadata = song_metadata
download_item.playlist_metadata = playlist_metadata
song_id = self.interface.get_media_id_of_library_media(song_metadata)
download_item.lyrics = await self.interface.get_lyrics(
song_metadata,
self.synced_lyrics_format,
)
webplayback = await self.interface.apple_music_api.get_webplayback(song_id)
download_item.media_tags = await self.interface.get_tags(
webplayback,
download_item.lyrics.unsynced if download_item.lyrics else None,
self.use_album_date,
)
if self.fetch_extra_tags:
download_item.extra_tags = await self.interface.get_extra_tags(
song_metadata,
)
if playlist_metadata:
download_item.playlist_tags = self.get_playlist_tags(
playlist_metadata,
song_metadata,
)
download_item.playlist_file_path = self.get_playlist_file_path(
download_item.playlist_tags,
)
download_item.final_path = self.get_final_path(
download_item.media_tags,
".m4a",
download_item.playlist_tags,
)
download_item.synced_lyrics_path = self.get_lyrics_synced_path(
download_item.final_path,
)
if self.synced_lyrics_only:
return download_item
for codec in self.codec_priority:
download_item.stream_info = await self.interface.get_stream_info(
codec,
song_metadata,
webplayback,
)
if download_item.stream_info:
break
if download_item.stream_info.audio_track.legacy:
download_item.decryption_key = (
await self.interface.get_decryption_key_legacy(
download_item.stream_info,
self.cdm,
)
)
elif (
not self.use_wrapper
and download_item.stream_info
and download_item.stream_info.audio_track.widevine_pssh
):
download_item.decryption_key = await self.interface.get_decryption_key(
download_item.stream_info,
self.cdm,
)
download_item.cover_url_template = self.interface.get_cover_url_template(
song_metadata,
self.cover_format,
)
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
download_item.random_uuid = self.get_random_uuid()
if download_item.stream_info and download_item.stream_info.file_format:
download_item.staged_path = self.get_temp_path(
song_id,
download_item.random_uuid,
"staged",
"." + download_item.stream_info.file_format.value,
)
else:
download_item.staged_path = None
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
download_item.final_path,
cover_file_extension,
)
return download_item
async def decrypt_amdecrypt(
self,
input_path: str,
output_path: str,
media_id: str,
fairplay_key: str,
) -> None:
await decrypt_file(
self.wrapper_decrypt_ip,
media_id,
fairplay_key,
input_path,
output_path,
)
async def decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
legacy: bool,
media_id: str,
fairplay_key: str,
):
if self.use_wrapper and not legacy:
await self.decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self.decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy,
)
def get_lyrics_synced_path(self, final_path: str) -> str:
return str(Path(final_path).with_suffix("." + self.synced_lyrics_format.value))
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
return str(Path(final_path).parent / ("Cover" + file_extension))
def write_synced_lyrics(
self,
synced_lyrics: str,
lyrics_synced_path: str,
):
Path(lyrics_synced_path).parent.mkdir(parents=True, exist_ok=True)
Path(lyrics_synced_path).write_text(synced_lyrics, encoding="utf8")
async def download(
self,
download_item: DownloadItem,
) -> None:
if self.synced_lyrics_only:
return
encrypted_path = self.get_temp_path(
download_item.media_metadata["id"],
download_item.random_uuid,
"encrypted",
".m4a",
)
await self.download_stream(
download_item.stream_info.audio_track.stream_url,
encrypted_path,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.decryption_key,
download_item.stream_info.audio_track.legacy,
download_item.media_metadata["id"],
download_item.stream_info.audio_track.fairplay_key,
)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
cover_bytes,
download_item.extra_tags,
)
@@ -1,107 +0,0 @@
from pathlib import Path
from ..interface.enums import CoverFormat, UploadedVideoQuality
from ..interface.interface_uploaded_video import AppleMusicUploadedVideoInterface
from .downloader_base import AppleMusicBaseDownloader
from .types import DownloadItem
class AppleMusicUploadedVideoDownloader(AppleMusicBaseDownloader):
def __init__(
self,
base_downloader: AppleMusicBaseDownloader,
interface: AppleMusicUploadedVideoInterface,
quality: UploadedVideoQuality = UploadedVideoQuality.BEST,
):
self.__dict__.update(base_downloader.__dict__)
self.interface = interface
self.quality = quality
def get_cover_path(self, final_path: str, file_extension: str) -> str:
return str(Path(final_path).with_suffix(file_extension))
async def get_download_item(
self,
uploaded_video_metadata: dict,
) -> DownloadItem:
try:
return await self._get_download_item(
uploaded_video_metadata,
)
except Exception as e:
return DownloadItem(
media_metadata=uploaded_video_metadata,
error=e,
)
async def _get_download_item(
self,
uploaded_video_metadata: dict,
) -> DownloadItem:
download_item = DownloadItem()
download_item.media_metadata = uploaded_video_metadata
download_item.media_tags = self.interface.get_tags(
uploaded_video_metadata,
)
download_item.stream_info = await self.interface.get_stream_info(
uploaded_video_metadata,
self.quality,
)
download_item.random_uuid = self.get_random_uuid()
download_item.staged_path = self.get_temp_path(
uploaded_video_metadata["id"],
download_item.random_uuid,
"staged",
"." + download_item.stream_info.file_format.value,
)
download_item.final_path = self.get_final_path(
download_item.media_tags,
Path(download_item.staged_path).suffix,
None,
)
download_item.cover_url_template = self.interface.get_cover_url_template(
uploaded_video_metadata,
self.cover_format,
)
download_item.cover_url = self.interface.get_cover_url(
download_item.cover_url_template,
self.cover_size,
self.cover_format,
)
cover_file_extension = await self.interface.get_cover_file_extension(
download_item.cover_url,
self.cover_format,
)
if cover_file_extension:
download_item.cover_path = self.get_cover_path(
download_item.final_path,
cover_file_extension,
)
return download_item
async def download(
self,
download_item: DownloadItem,
) -> None:
await self.download_ytdlp(
download_item.stream_info.video_track.stream_url,
download_item.staged_path,
)
cover_bytes = (
await self.interface.get_cover_bytes(download_item.cover_url)
if self.cover_format != CoverFormat.RAW
else None
)
await self.apply_tags(
download_item.staged_path,
download_item.media_tags,
cover_bytes,
)
-22
View File
@@ -1,10 +1,5 @@
from enum import Enum
from .constants import (
ARTIST_AUTO_SELECT_KEY_MAP,
ARTIST_AUTO_SELECT_STR_MAP,
)
class DownloadMode(Enum):
YTDLP = "ytdlp"
@@ -19,20 +14,3 @@ class RemuxMode(Enum):
class RemuxFormatMusicVideo(Enum):
M4V = "m4v"
MP4 = "mp4"
class ArtistAutoSelect(Enum):
MAIN_ALBUMS = "main-albums"
COMPILATION_ALBUMS = "compilation-albums"
LIVE_ALBUMS = "live-albums"
SINGLES_EPS = "singles-eps"
ALL_ALBUMS = "all-albums"
TOP_SONGS = "top-songs"
MUSIC_VIDEOS = "music-videos"
@property
def path_key(self) -> tuple[str, str]:
return ARTIST_AUTO_SELECT_KEY_MAP[self.value]
def __str__(self) -> str:
return ARTIST_AUTO_SELECT_STR_MAP[self.value]
+11 -22
View File
@@ -1,31 +1,20 @@
from ..utils import GamdlError
class MediaFileExists(GamdlError):
def __init__(self, media_path: str):
super().__init__(f"Media file already exists at path: {media_path}")
class GamdlDownloaderError(GamdlError):
pass
class NotStreamable(GamdlError):
def __init__(self, media_id: str):
super().__init__(f"Media ID is not streamable: {media_id}")
class GamdlDownloaderSyncedLyricsOnlyError(GamdlDownloaderError):
def __init__(self) -> None:
super().__init__("Download mode is set to synced lyrics only")
class FormatNotAvailable(GamdlError):
def __init__(self, media_id: str):
super().__init__(f"Requested format is not available for media ID: {media_id}")
class GamdlDownloaderMediaFileExistsError(GamdlDownloaderError):
def __init__(self, file_path: str) -> None:
super().__init__(f"Media file already exists: {file_path}")
class ExecutableNotFound(GamdlError):
def __init__(self, executable: str):
super().__init__(f"Executable not found: {executable}")
class SyncedLyricsOnly(GamdlError):
def __init__(self):
super().__init__("Only downloading synced lyrics is supported")
class UnsupportedMediaType(GamdlError):
def __init__(self, media_type: str):
super().__init__(f"Unsupported media type: {media_type}")
class GamdlDownloaderDependencyNotFoundError(GamdlDownloaderError):
def __init__(self, dependency_name: str) -> None:
super().__init__(f"Required dependency not found: {dependency_name}")
-3
View File
@@ -1,3 +0,0 @@
# Dumped from Android Studio Virtual Device running Android 9
HARDCODED_WVD = """V1ZEAgIDAASoMIIEpAIBAAKCAQEAwnCFAPXy4U1J7p1NohAS+xl040f5FBaE/59bPp301bGz0UGFT9VoEtY3vaeakKh/d319xTNvCSWsEDRaMmp/wSnMiEZUkkl04872jx2uHuR4k6KYuuJoqhsIo1TwUBueFZynHBUJzXQeW8Eb1tYAROGwp8W7r+b0RIjHC89RFnfVXpYlF5I6McktyzJNSOwlQbMqlVihfSUkv3WRd3HFmA0Oxay51CEIkoTlNTHVlzVyhov5eHCDSp7QENRgaaQ03jC/CcgFOoQymhsBtRCM0CQmfuAHjA9e77R6m/GJPy75G9fqoZM1RMzVDHKbKZPd3sFd0c0+77gLzW8cWEaaHwIDAQABAoIBAQCB2pN46MikHvHZIcTPDt0eRQoDH/YArGl2Lf7J+sOgU2U7wv49KtCug9IGHwDiyyUVsAFmycrF2RroV45FTUq0vi2SdSXV7Kjb20Ren/vBNeQw9M37QWmU8Sj7q6YyWb9hv5T69DHvvDTqIjVtbM4RMojAAxYti5hmjNIh2PrWfVYWhXxCQ/WqAjWLtZBM6Oww1byfr5I/wFogAKkgHi8wYXZ4LnIC8V7jLAhujlToOvMMC9qwcBiPKDP2FO+CPSXaqVhH+LPSEgLggnU3EirihgxovbLNAuDEeEbRTyR70B0lW19tLHixso4ZQa7KxlVUwOmrHSZf7nVuWqPpxd+BAoGBAPQLyJ1IeRavmaU8XXxfMdYDoc8+xB7v2WaxkGXb6ToX1IWPkbMz4yyVGdB5PciIP3rLZ6s1+ruuRRV0IZ98i1OuN5TSR56ShCGg3zkd5C4L/xSMAz+NDfYSDBdO8BVvBsw21KqSRUi1ctL7QiIvfedrtGb5XrE4zhH0gjXlU5qZAoGBAMv2segn0Jx6az4rqRa2Y7zRx4iZ77JUqYDBI8WMnFeR54uiioTQ+rOs3zK2fGIWlrn4ohco/STHQSUTB8oCOFLMx1BkOqiR+UyebO28DJY7+V9ZmxB2Guyi7W8VScJcIdpSOPyJFOWZQKXdQFW3YICD2/toUx/pDAJh1sEVQsV3AoGBANyyp1rthmvoo5cVbymhYQ08vaERDwU3PLCtFXu4E0Ow90VNn6Ki4ueXcv/gFOp7pISk2/yuVTBTGjCblCiJ1en4HFWekJwrvgg3Vodtq8Okn6pyMCHRqvWEPqD5hw6rGEensk0K+FMXnF6GULlfn4mgEkYpb+PvDhSYvQSGfkPJAoGAF/bAKFqlM/1eJEvU7go35bNwEiij9Pvlfm8y2L8Qj2lhHxLV240CJ6IkBz1Rl+S3iNohkT8LnwqaKNT3kVB5daEBufxMuAmOlOX4PmZdxDj/r6hDg8ecmjj6VJbXt7JDd/c5ItKoVeGPqu035dpJyE+1xPAY9CLZel4scTsiQTkCgYBt3buRcZMwnc4qqpOOQcXK+DWD6QvpkcJ55ygHYw97iP/lF4euwdHd+I5b+11pJBAao7G0fHX3eSjqOmzReSKboSe5L8ZLB2cAI8AsKTBfKHWmCa8kDtgQuI86fUfirCGdhdA9AVP2QXN2eNCuPnFWi0WHm4fYuUB5be2c18ucxAb9CAESmgsK3QMIAhIQ071yBlsbLoO2CSB9Ds0cmRif6uevBiKOAjCCAQoCggEBAMJwhQD18uFNSe6dTaIQEvsZdONH+RQWhP+fWz6d9NWxs9FBhU/VaBLWN72nmpCof3d9fcUzbwklrBA0WjJqf8EpzIhGVJJJdOPO9o8drh7keJOimLriaKobCKNU8FAbnhWcpxwVCc10HlvBG9bWAEThsKfFu6/m9ESIxwvPURZ31V6WJReSOjHJLcsyTUjsJUGzKpVYoX0lJL91kXdxxZgNDsWsudQhCJKE5TUx1Zc1coaL+Xhwg0qe0BDUYGmkNN4wvwnIBTqEMpobAbUQjNAkJn7gB4wPXu+0epvxiT8u+RvX6qGTNUTM1QxymymT3d7BXdHNPu+4C81vHFhGmh8CAwEAASjwIkgBUqoBCAEQABqBAQQlRbfiBNDb6eU6aKrsH5WJaYszTioXjPLrWN9dqyW0vwfT11kgF0BbCGkAXew2tLJJqIuD95cjJvyGUSN6VyhL6dp44fWEGDSBIPR0mvRq7bMP+m7Y/RLKf83+OyVJu/BpxivQGC5YDL9f1/A8eLhTDNKXs4Ia5DrmTWdPTPBL8SIgyfUtg3ofI+/I9Tf7it7xXpT0AbQBJfNkcNXGpO3JcBMSgAIL5xsXK5of1mMwAl6ygN1Gsj4aZ052otnwN7kXk12SMsXheWTZ/PYh2KRzmt9RPS1T8hyFx/Kp5VkBV2vTAqqWrGw/dh4URqiHATZJUlhO7PN5m2Kq1LVFdXjWSzP5XBF2S83UMe+YruNHpE5GQrSyZcBqHO0QrdPcU35GBT7S7+IJr2AAXvnjqnb8yrtpPWN2ZW/IWUJN2z4vZ7/HV4aj3OZhkxC1DIMNyvsusUKoQQuf8gwKiEe8cFwbwFSicywlFk9la2IPe8oFShcxAzHLCCn/TIYUAvEL3/4LgaZvqWm80qCPYbgIP5HT8hPYkKWJ4WYknEWK+3InbnkzteFfGrQFCq4CCAESEGnj6Ji7LD+4o7MoHYT4jBQYjtW+kQUijgIwggEKAoIBAQDY9um1ifBRIOmkPtDZTqH+CZUBbb0eK0Cn3NHFf8MFUDzPEz+emK/OTub/hNxCJCao//pP5L8tRNUPFDrrvCBMo7Rn+iUb+mA/2yXiJ6ivqcN9Cu9i5qOU1ygon9SWZRsujFFB8nxVreY5Lzeq0283zn1Cg1stcX4tOHT7utPzFG/ReDFQt0O/GLlzVwB0d1sn3SKMO4XLjhZdncrtF9jljpg7xjMIlnWJUqxDo7TQkTytJmUl0kcM7bndBLerAdJFGaXc6oSY4eNy/IGDluLCQR3KZEQsy/mLeV1ggQ44MFr7XOM+rd+4/314q/deQbjHqjWFuVr8iIaKbq+R63ShAgMBAAEo8CISgAMii2Mw6z+Qs1bvvxGStie9tpcgoO2uAt5Zvv0CDXvrFlwnSbo+qR71Ru2IlZWVSbN5XYSIDwcwBzHjY8rNr3fgsXtSJty425djNQtF5+J2jrAhf3Q2m7EI5aohZGpD2E0cr+dVj9o8x0uJR2NWR8FVoVQSXZpad3M/4QzBLNto/tz+UKyZwa7Sc/eTQc2+ZcDS3ZEO3lGRsH864Kf/cEGvJRBBqcpJXKfG+ItqEW1AAPptjuggzmZEzRq5xTGf6or+bXrKjCpBS9G1SOyvCNF1k5z6lG8KsXhgQxL6ADHMoulxvUIihyPY5MpimdXfUdEQ5HA2EqNiNVNIO4qP007jW51yAeThOry4J22xs8RdkIClOGAauLIl0lLA4flMzW+VfQl5xYxP0E5tuhn0h+844DslU8ZF7U1dU2QprIApffXD9wgAACk26Rggy8e96z8i86/+YYyZQkc9hIdCAERrgEYCEbByzONrdRDs1MrS/ch1moV5pJv63BIKvQHGvLkaFwoMY29tcGFueV9uYW1lEgd1bmtub3duGioKCm1vZGVsX25hbWUSHEFuZHJvaWQgU0RLIGJ1aWx0IGZvciB4ODZfNjQaGwoRYXJjaGl0ZWN0dXJlX25hbWUSBng4Nl82NBodCgtkZXZpY2VfbmFtZRIOZ2VuZXJpY194ODZfNjQaIAoMcHJvZHVjdF9uYW1lEhBzZGtfcGhvbmVfeDg2XzY0GmMKCmJ1aWxkX2luZm8SVUFuZHJvaWQvc2RrX3Bob25lX3g4Nl82NC9nZW5lcmljX3g4Nl82NDo5L1BTUjEuMTgwNzIwLjAxMi80OTIzMjE0OnVzZXJkZWJ1Zy90ZXN0LWtleXMaHgoUd2lkZXZpbmVfY2RtX3ZlcnNpb24SBjE0LjAuMBokCh9vZW1fY3J5cHRvX3NlY3VyaXR5X3BhdGNoX2xldmVsEgEwMg4QASAAKA0wAEAASABQAA=="""
+223
View File
@@ -0,0 +1,223 @@
from pathlib import Path
from ..interface.enums import CoverFormat
from ..interface.types import AppleMusicMedia, DecryptionKeyAv
from ..utils import async_subprocess
from .base import AppleMusicBaseDownloader
from .enums import RemuxFormatMusicVideo, RemuxMode
from .types import DownloadItem
class AppleMusicMusicVideoDownloader:
def __init__(
self,
base: AppleMusicBaseDownloader,
remux_mode: RemuxMode = RemuxMode.FFMPEG,
remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V,
):
self.base = base
self.remux_mode = remux_mode
self.remux_format = remux_format
async def _remux_mp4box(
self,
input_path_video: str,
input_path_audio: str,
output_path: str,
):
await async_subprocess(
self.base.full_mp4box_path,
"-quiet",
"-add",
input_path_audio,
"-add",
input_path_video,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
output_path,
silent=self.base.silent,
)
async def _remux_ffmpeg(
self,
input_path_video: str,
input_path_audio: str,
output_path: str,
decryption_key: str = None,
):
if decryption_key:
key = [
"-decryption_key",
decryption_key,
]
else:
key = []
await async_subprocess(
self.base.full_ffmpeg_path,
"-loglevel",
"error",
"-y",
*key,
"-i",
input_path_video,
"-i",
input_path_audio,
"-c",
"copy",
"-c:s",
"mov_text",
"-movflags",
"+faststart",
output_path,
silent=self.base.silent,
)
async def _decrypt_mp4decrypt(
self,
input_path: str,
output_path: str,
decryption_key: str,
):
await async_subprocess(
self.base.full_mp4decrypt_path,
"--key",
f"1:{decryption_key}",
input_path,
output_path,
silent=self.base.silent,
)
async def stage(
self,
encrypted_path_video: str,
encrypted_path_audio: str,
decrypted_path_video: str,
decrypted_path_audio: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
):
await self._decrypt_mp4decrypt(
encrypted_path_video,
decrypted_path_video,
decryption_key.video_track.key,
)
await self._decrypt_mp4decrypt(
encrypted_path_audio,
decrypted_path_audio,
decryption_key.audio_track.key,
)
if self.remux_mode == RemuxMode.MP4BOX:
await self._remux_mp4box(
decrypted_path_video,
decrypted_path_audio,
staged_path,
)
else:
await self._remux_ffmpeg(
decrypted_path_video,
decrypted_path_audio,
staged_path,
)
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
return str(Path(final_path).with_suffix(file_extension))
async def get_download_item(
self,
media: AppleMusicMedia,
) -> DownloadItem:
download_item = DownloadItem(media)
download_item.staged_path = self.base.get_temp_path(
media.media_metadata["id"],
download_item.uuid_,
"staged",
"." + media.stream_info.file_format.value,
)
download_item.final_path = self.base.get_final_path(
media.tags,
"." + media.stream_info.file_format.value,
media.playlist_tags,
)
if media.playlist_tags:
download_item.playlist_file_path = self.base.get_playlist_file_path(
media.playlist_tags,
)
download_item.cover_path = self.get_cover_path(
download_item.final_path,
media.cover.file_extension,
)
return download_item
async def download(
self,
download_item: DownloadItem,
) -> None:
encrypted_path_video = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted_video",
".mp4",
)
encrypted_path_audio = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted_audio",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.video_track.stream_url,
encrypted_path_video,
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path_audio,
)
decrypted_path_video = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"decrypted_video",
".mp4",
)
decrypted_path_audio = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"decrypted_audio",
".m4a",
)
await self.stage(
encrypted_path_video,
encrypted_path_audio,
decrypted_path_video,
decrypted_path_audio,
download_item.staged_path,
download_item.media.decryption_key,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
download_item.media.cover.url
)
if self.base.interface.base.cover_format != CoverFormat.RAW
else None
)
await self.base.apply_tags(
download_item.staged_path,
download_item.media.tags,
cover_bytes,
)
+180
View File
@@ -0,0 +1,180 @@
from pathlib import Path
import structlog
from ..interface.enums import CoverFormat
from ..interface.types import AppleMusicMedia, DecryptionKeyAv
from .amdecrypt import decrypt_file, decrypt_file_hex
from .base import AppleMusicBaseDownloader
from .types import DownloadItem
logger = structlog.get_logger(__name__)
class AppleMusicSongDownloader:
def __init__(
self,
base: AppleMusicBaseDownloader,
):
self.base = base
async def get_download_item(self, media: AppleMusicMedia) -> DownloadItem:
download_item = DownloadItem(media)
download_item.staged_path = self.base.get_temp_path(
media.media_metadata["id"],
download_item.uuid_,
"staged",
"." + media.stream_info.file_format.value,
)
download_item.final_path = self.base.get_final_path(
media.tags,
".m4a",
media.playlist_tags,
)
if media.playlist_tags:
download_item.playlist_file_path = self.base.get_playlist_file_path(
media.playlist_tags,
)
download_item.synced_lyrics_path = self.get_synced_lyrics_path(
download_item.final_path
)
download_item.cover_path = self.get_cover_path(
download_item.final_path,
media.cover.file_extension,
)
return download_item
async def _decrypt_amdecrypt(
self,
input_path: str,
output_path: str,
media_id: str,
fairplay_key: str,
) -> None:
await decrypt_file(
self.base.wrapper_decrypt_ip,
media_id,
fairplay_key,
input_path,
output_path,
)
async def _decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
legacy: bool,
media_id: str,
fairplay_key: str,
):
log = logger.bind(
action="stage_song",
media_id=media_id,
encrypted_path=encrypted_path,
staged_path=staged_path,
)
if self.base.use_wrapper and not legacy:
await self._decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self._decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy,
)
log.debug("success")
def get_synced_lyrics_path(self, final_path: str) -> str:
log = logger.bind(action="get_synced_lyrics_path", final_path=final_path)
synced_lyrics_path = str(
Path(final_path).with_suffix(
"." + self.base.interface.song.synced_lyrics_format.value
)
)
log.debug("success", synced_lyrics_path=synced_lyrics_path)
return synced_lyrics_path
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
log = logger.bind(
action="get_song_cover_path",
final_path=final_path,
file_extension=file_extension,
)
cover_path = str(Path(final_path).parent / ("Cover" + file_extension))
log.debug("success", cover_path=cover_path)
return cover_path
async def download(
self,
download_item: DownloadItem,
) -> None:
encrypted_path = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.media.decryption_key,
download_item.media.stream_info.audio_track.legacy,
download_item.media.media_metadata["id"],
download_item.media.stream_info.audio_track.fairplay_key,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
download_item.media.cover.url
)
if self.base.interface.base.cover_format != CoverFormat.RAW
else None
)
await self.base.apply_tags(
download_item.staged_path,
download_item.media.tags,
cover_bytes,
)
+4 -33
View File
@@ -1,44 +1,15 @@
import uuid
from dataclasses import dataclass
from typing import Any
from ..interface.types import (
DecryptionKeyAv,
Lyrics,
MediaTags,
PlaylistTags,
StreamInfoAv,
)
from ..interface.types import AppleMusicMedia
@dataclass
class DownloadItem:
media_metadata: dict = None
playlist_metadata: dict = None
random_uuid: str = None
lyrics: Lyrics = None
media_tags: MediaTags = None
extra_tags: dict = None
playlist_tags: PlaylistTags = None
stream_info: StreamInfoAv = None
decryption_key: DecryptionKeyAv = None
cover_url_template: str = None
cover_url: str = None
media: AppleMusicMedia
uuid_: str = uuid.uuid4().hex[:8]
staged_path: str = None
final_path: str = None
playlist_file_path: str = None
synced_lyrics_path: str = None
cover_path: str = None
flat_filter_result: Any = None
error: Exception = None
@dataclass
class UrlInfo:
storefront: str = None
type: str = None
slug: str = None
id: str = None
sub_id: str = None
library_storefront: str = None
library_type: str = None
library_id: str = None
+65
View File
@@ -0,0 +1,65 @@
from pathlib import Path
from ..interface.enums import CoverFormat
from ..interface.types import AppleMusicMedia
from .base import AppleMusicBaseDownloader
from .types import DownloadItem
class AppleMusicUploadedVideoDownloader:
def __init__(
self,
base: AppleMusicBaseDownloader,
):
self.base = base
def get_cover_path(self, final_path: str, file_extension: str) -> str:
return str(Path(final_path).with_suffix(file_extension))
async def get_download_item(
self,
media: AppleMusicMedia,
) -> DownloadItem:
download_item = DownloadItem(media)
download_item.staged_path = self.base.get_temp_path(
media.media_metadata["id"],
download_item.uuid_,
"staged",
"." + media.stream_info.file_format.value,
)
download_item.final_path = self.base.get_final_path(
media.tags,
"." + media.stream_info.file_format.value,
media.playlist_tags,
)
download_item.cover_path = self.get_cover_path(
download_item.final_path,
media.cover.file_extension,
)
return download_item
async def download(
self,
download_item: DownloadItem,
) -> None:
await self.base._download_ytdlp_async(
download_item.media.stream_info.video_track.stream_url,
download_item.staged_path,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
download_item.media.cover.url
)
if self.base.interface.base.cover_format != CoverFormat.RAW
else None
)
await self.base.apply_tags(
download_item.staged_path,
download_item.media.tags,
cover_bytes,
)