Remove gamdl core modules and CLI implementation

This commit is contained in:
Rafael Moraes
2025-10-19 17:44:56 -03:00
parent ffe261388a
commit 0b440fd850
14 changed files with 0 additions and 3675 deletions
-3
View File
@@ -1,3 +0,0 @@
from .cli import main
main()
-653
View File
@@ -1,653 +0,0 @@
from __future__ import annotations
import inspect
import logging
import typing
from pathlib import Path
import click
import colorama
from . import __version__
from .apple_music_api import AppleMusicApi
from .config_file import ConfigFile
from .constants import *
from .custom_logger_formatter import CustomLoggerFormatter
from .downloader import Downloader
from .downloader_music_video import DownloaderMusicVideo
from .downloader_post import DownloaderPost
from .downloader_song import DownloaderSong
from .enums import (
CoverFormat,
DownloadMode,
MusicVideoCodec,
MusicVideoResolution,
PostQuality,
RemuxFormatMusicVideo,
RemuxMode,
SongCodec,
SyncedLyricsFormat,
)
from .exceptions import *
from .itunes_api import ItunesApi
from .utils import color_text, prompt_path
apple_music_api_from_netscape_cookies_sig = inspect.signature(
AppleMusicApi.from_netscape_cookies
)
downloader_sig = inspect.signature(Downloader.__init__)
downloader_song_sig = inspect.signature(DownloaderSong.__init__)
downloader_music_video_sig = inspect.signature(DownloaderMusicVideo.__init__)
downloader_post_sig = inspect.signature(DownloaderPost.__init__)
logger = logging.getLogger("gamdl")
class Csv(click.ParamType):
name = "csv"
def __init__(
self,
subtype: typing.Any,
) -> None:
self.subtype = subtype
def convert(
self,
value: str | typing.Any,
param: click.Parameter,
ctx: click.Context,
) -> list[typing.Any]:
if not isinstance(value, str):
return value
items = [v.strip() for v in value.split(",") if v.strip()]
result = []
for item in items:
try:
result.append(self.subtype(item))
except ValueError as e:
self.fail(
f"'{item}' is not a valid value for {self.subtype.__name__}",
param,
ctx,
)
return result
def load_config_file(
ctx: click.Context,
param: click.Parameter,
no_config_file: bool,
) -> click.Context:
if no_config_file:
return ctx
filtered_params = [
param
for param in ctx.command.params
if param.name not in EXCLUDED_CONFIG_FILE_PARAMS
]
config_file = ConfigFile(ctx.params["config_path"])
config_file.add_params_default_to_config(
filtered_params,
)
parsed_params = config_file.parse_params_from_config(
[
param
for param in filtered_params
if ctx.get_parameter_source(param.name)
!= click.core.ParameterSource.COMMANDLINE
]
)
ctx.params.update(parsed_params)
return ctx
@click.command()
@click.help_option("-h", "--help")
@click.version_option(__version__, "-v", "--version")
# CLI specific options
@click.argument(
"urls",
nargs=-1,
type=str,
required=True,
)
@click.option(
"--disable-music-video-skip",
is_flag=True,
help="Don't skip downloading music videos in albums/playlists.",
)
@click.option(
"--read-urls-as-txt",
"-r",
is_flag=True,
help="Interpret URLs as paths to text files containing URLs separated by newlines",
)
@click.option(
"--config-path",
type=Path,
default=Path.home() / ".gamdl" / "config.ini",
help="Path to config file.",
)
@click.option(
"--log-level",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]),
default="INFO",
help="Log level.",
)
@click.option(
"--no-exceptions",
is_flag=True,
help="Don't print exceptions.",
)
# API specific options
@click.option(
"--cookies-path",
"-c",
type=Path,
default=apple_music_api_from_netscape_cookies_sig.parameters[
"cookies_path"
].default,
help="Path to .txt cookies file.",
)
@click.option(
"--language",
"-l",
type=str,
default=apple_music_api_from_netscape_cookies_sig.parameters["language"].default,
help="Metadata language as an ISO-2A language code (don't always work for videos).",
)
# Downloader specific options
@click.option(
"--output-path",
"-o",
type=Path,
default=downloader_sig.parameters["output_path"].default,
help="Path to output directory.",
)
@click.option(
"--temp-path",
type=Path,
default=downloader_sig.parameters["temp_path"].default,
help="Path to temporary directory.",
)
@click.option(
"--wvd-path",
type=Path,
default=downloader_sig.parameters["wvd_path"].default,
help="Path to .wvd file.",
)
@click.option(
"--overwrite",
is_flag=True,
help="Overwrite existing files.",
default=downloader_sig.parameters["overwrite"].default,
)
@click.option(
"--save-cover",
"-s",
is_flag=True,
help="Save cover as a separate file.",
default=downloader_sig.parameters["save_cover"].default,
)
@click.option(
"--save-playlist",
is_flag=True,
help="Save a M3U8 playlist file when downloading a playlist.",
default=downloader_sig.parameters["save_playlist"].default,
)
@click.option(
"--no-synced-lyrics",
is_flag=True,
help="Don't download the synced lyrics.",
default=downloader_sig.parameters["no_synced_lyrics"].default,
)
@click.option(
"--synced-lyrics-only",
is_flag=True,
help="Download only the synced lyrics.",
default=downloader_sig.parameters["synced_lyrics_only"].default,
)
@click.option(
"--nm3u8dlre-path",
type=str,
default=downloader_sig.parameters["nm3u8dlre_path"].default,
help="Path to N_m3u8DL-RE binary.",
)
@click.option(
"--mp4decrypt-path",
type=str,
default=downloader_sig.parameters["mp4decrypt_path"].default,
help="Path to mp4decrypt binary.",
)
@click.option(
"--ffmpeg-path",
type=str,
default=downloader_sig.parameters["ffmpeg_path"].default,
help="Path to FFmpeg binary.",
)
@click.option(
"--mp4box-path",
type=str,
default=downloader_sig.parameters["mp4box_path"].default,
help="Path to MP4Box binary.",
)
@click.option(
"--download-mode",
type=DownloadMode,
default=downloader_sig.parameters["download_mode"].default,
help="Download mode.",
)
@click.option(
"--remux-mode",
type=RemuxMode,
default=downloader_sig.parameters["remux_mode"].default,
help="Remux mode.",
)
@click.option(
"--cover-format",
type=CoverFormat,
default=downloader_sig.parameters["cover_format"].default,
help="Cover format.",
)
@click.option(
"--template-folder-album",
type=str,
default=downloader_sig.parameters["template_folder_album"].default,
help="Template folder for tracks that are part of an album.",
)
@click.option(
"--template-folder-compilation",
type=str,
default=downloader_sig.parameters["template_folder_compilation"].default,
help="Template folder for tracks that are part of a compilation album.",
)
@click.option(
"--template-file-single-disc",
type=str,
default=downloader_sig.parameters["template_file_single_disc"].default,
help="Template file for the tracks that are part of a single-disc album.",
)
@click.option(
"--template-file-multi-disc",
type=str,
default=downloader_sig.parameters["template_file_multi_disc"].default,
help="Template file for the tracks that are part of a multi-disc album.",
)
@click.option(
"--template-folder-no-album",
type=str,
default=downloader_sig.parameters["template_folder_no_album"].default,
help="Template folder for the tracks that are not part of an album.",
)
@click.option(
"--template-file-no-album",
type=str,
default=downloader_sig.parameters["template_file_no_album"].default,
help="Template file for the tracks that are not part of an album.",
)
@click.option(
"--template-file-playlist",
type=str,
default=downloader_sig.parameters["template_file_playlist"].default,
help="Template file for the M3U8 playlist.",
)
@click.option(
"--template-date",
type=str,
default=downloader_sig.parameters["template_date"].default,
help="Date tag template.",
)
@click.option(
"--exclude-tags",
type=Csv(str),
default=downloader_sig.parameters["exclude_tags"].default,
help="Comma-separated tags to exclude.",
)
@click.option(
"--cover-size",
type=int,
default=downloader_sig.parameters["cover_size"].default,
help="Cover size.",
)
@click.option(
"--truncate",
type=int,
default=downloader_sig.parameters["truncate"].default,
help="Maximum length of the file/folder names.",
)
@click.option(
"--database-path",
type=Path,
default=downloader_sig.parameters["database_path"].default,
help="Path to the downloaded media database file.",
)
# DownloaderSong specific options
@click.option(
"--codec-song",
type=SongCodec,
default=downloader_song_sig.parameters["codec"].default,
help="Song codec.",
)
@click.option(
"--synced-lyrics-format",
type=SyncedLyricsFormat,
default=downloader_song_sig.parameters["synced_lyrics_format"].default,
help="Synced lyrics format.",
)
# DownloaderMusicVideo specific options
@click.option(
"--codec-music-video",
type=Csv(MusicVideoCodec),
default=downloader_music_video_sig.parameters["codec"].default,
help="Comma-separated music video codec priority.",
)
@click.option(
"--remux-format-music-video",
type=RemuxFormatMusicVideo,
default=downloader_music_video_sig.parameters["remux_format"].default,
help="Music video remux format.",
)
@click.option(
"--resolution",
type=MusicVideoResolution,
default=downloader_music_video_sig.parameters["resolution"].default,
help="Target video resolution for music videos.",
)
# DownloaderPost specific options
@click.option(
"--quality-post",
type=PostQuality,
default=downloader_post_sig.parameters["quality"].default,
help="Post video quality.",
)
# This option should always be last
@click.option(
"--no-config-file",
"-n",
is_flag=True,
callback=load_config_file,
help="Do not use a config file.",
)
def main(
urls: list[str],
disable_music_video_skip: bool,
read_urls_as_txt: bool,
config_path: Path,
log_level: str,
no_exceptions: bool,
cookies_path: Path,
language: str,
output_path: Path,
temp_path: Path,
wvd_path: Path,
overwrite: bool,
save_cover: bool,
save_playlist: bool,
no_synced_lyrics: bool,
synced_lyrics_only: bool,
nm3u8dlre_path: str,
mp4decrypt_path: str,
ffmpeg_path: str,
mp4box_path: str,
download_mode: DownloadMode,
remux_mode: RemuxMode,
cover_format: CoverFormat,
template_folder_album: str,
template_folder_compilation: str,
template_file_single_disc: str,
template_file_multi_disc: str,
template_folder_no_album: str,
template_file_no_album: str,
template_file_playlist: str,
template_date: str,
exclude_tags: list[str],
cover_size: int,
truncate: int,
database_path: Path,
codec_song: SongCodec,
synced_lyrics_format: SyncedLyricsFormat,
codec_music_video: list[MusicVideoCodec],
remux_format_music_video: RemuxFormatMusicVideo,
resolution: MusicVideoResolution,
quality_post: PostQuality,
no_config_file: bool,
):
colorama.just_fix_windows_console()
logger.setLevel(log_level)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomLoggerFormatter())
logger.addHandler(stream_handler)
cookies_path = prompt_path(True, cookies_path, "Cookies file")
if wvd_path:
wvd_path = prompt_path(True, wvd_path, ".wvd file")
logger.info("Starting Gamdl")
apple_music_api = AppleMusicApi.from_netscape_cookies(
cookies_path,
language,
)
if not apple_music_api.account_info["meta"]["subscription"]["active"]:
logger.critical(
"No active Apple Music subscription found, you won't be able to download"
" anything"
)
return
if apple_music_api.account_info["data"][0]["attributes"].get("restrictions"):
logger.warning(
"Your account has content restrictions enabled, some content may not be"
" downloadable"
)
itunes_api = ItunesApi(
apple_music_api.storefront,
apple_music_api.language,
)
downloader = Downloader(
apple_music_api,
itunes_api,
output_path,
temp_path,
wvd_path,
overwrite,
save_cover,
save_playlist,
no_synced_lyrics,
synced_lyrics_only,
nm3u8dlre_path,
mp4decrypt_path,
ffmpeg_path,
mp4box_path,
download_mode,
remux_mode,
cover_format,
template_folder_album,
template_folder_compilation,
template_file_single_disc,
template_file_multi_disc,
template_folder_no_album,
template_file_no_album,
template_file_playlist,
template_date,
exclude_tags,
cover_size,
truncate,
database_path,
log_level in ("WARNING", "ERROR"),
)
downloader_song = DownloaderSong(
downloader,
codec_song,
synced_lyrics_format,
)
downloader_music_video = DownloaderMusicVideo(
downloader,
codec_music_video,
remux_format_music_video,
resolution,
)
downloader_post = DownloaderPost(
downloader,
quality_post,
)
skip_mv = False
if not synced_lyrics_only:
logger.debug("Setting up CDM")
downloader.set_cdm()
if not downloader.ffmpeg_path_full and (
remux_mode == RemuxMode.FFMPEG or download_mode == DownloadMode.NM3U8DLRE
):
logger.critical(X_NOT_FOUND_STRING.format("ffmpeg", ffmpeg_path))
return
if not downloader.mp4box_path_full and remux_mode == RemuxMode.MP4BOX:
logger.critical(X_NOT_FOUND_STRING.format("MP4Box", mp4box_path))
return
if (
not downloader.mp4decrypt_path_full
and codec_song
not in (
SongCodec.AAC_LEGACY,
SongCodec.AAC_HE_LEGACY,
)
or (remux_mode == RemuxMode.MP4BOX and not downloader.mp4decrypt_path_full)
):
logger.critical(X_NOT_FOUND_STRING.format("mp4decrypt", mp4decrypt_path))
return
if (
download_mode == DownloadMode.NM3U8DLRE
and not downloader.nm3u8dlre_path_full
):
logger.critical(X_NOT_FOUND_STRING.format("N_m3u8DL-RE", nm3u8dlre_path))
return
if not downloader.mp4decrypt_path_full:
logger.warning(
X_NOT_FOUND_STRING.format("mp4decrypt", mp4decrypt_path)
+ ", music videos will not be downloaded"
)
skip_mv = True
if not codec_song.is_legacy():
logger.warning(
"You have chosen an experimental song codec. "
"They're not guaranteed to work due to API limitations."
)
if read_urls_as_txt:
_urls = []
for url in urls:
if Path(url).exists():
_urls.extend(Path(url).read_text(encoding="utf-8").splitlines())
urls = _urls
error_count = 0
for url_index, url in enumerate(urls, start=1):
url_progress = color_text(f"URL {url_index}/{len(urls)}", colorama.Style.DIM)
try:
logger.info(f'({url_progress}) Processing "{url}"')
url_info = downloader.parse_url_info(url)
if not url_info:
error_count += 1
logger.error(f"({url_progress}) Invalid URL, skipping")
continue
download_queue = downloader.get_download_queue(url_info)
if not download_queue:
error_count += 1
logger.error(f"({url_progress}) Media not found, skipping")
continue
download_queue_medias_metadata = download_queue.medias_metadata
except Exception as e:
error_count += 1
logger.error(
f'({url_progress}) Failed to process URL "{url}", skipping',
exc_info=not no_exceptions,
)
continue
for download_index, media_metadata in enumerate(
download_queue_medias_metadata,
start=1,
):
queue_progress = color_text(
f"Track {download_index}/{len(download_queue_medias_metadata)} from URL {url_index}/{len(urls)}",
colorama.Style.DIM,
)
try:
logger.info(
f'({queue_progress}) "{media_metadata["attributes"]["name"]}"'
)
if (
(
synced_lyrics_only
and media_metadata["type"] not in {"songs", "library-songs"}
)
or (media_metadata["type"] == "music-videos" and skip_mv)
or (
media_metadata["type"] == "music-videos"
and url_info.type == "album"
and not disable_music_video_skip
)
):
logger.warning(
f"({queue_progress}) Track is not downloadable with current configuration, skipping"
)
continue
if media_metadata["type"] in {"songs", "library-songs"}:
for _ in downloader_song.download(
media_metadata=media_metadata,
playlist_attributes=download_queue.playlist_attributes,
playlist_track=download_index,
):
pass
if media_metadata["type"] in {"music-videos", "library-music-videos"}:
for _ in downloader_music_video.download(
media_metadata=media_metadata,
playlist_attributes=download_queue.playlist_attributes,
playlist_track=download_index,
):
pass
if media_metadata["type"] == "uploaded-videos":
for _ in downloader_post.download(
media_metadata=media_metadata,
):
pass
except KeyboardInterrupt:
exit(0)
except (
MediaNotStreamableException,
MediaFileAlreadyExistsException,
MediaFormatNotAvailableException,
) as e:
logger.warning(
f"({queue_progress}) {e}, skipping",
)
except Exception as e:
error_count += 1
logger.error(
f'({queue_progress}) Failed to download "{media_metadata["attributes"]["name"]}"',
exc_info=not no_exceptions,
)
logger.info(f"Done, {error_count} error(s) occurred")
-101
View File
@@ -1,101 +0,0 @@
from __future__ import annotations
import configparser
from enum import Enum
from pathlib import Path
import click
import typing
class ConfigFile:
def __init__(
self,
config_path: Path,
section_name: str = "gamdl",
) -> None:
self.config_path = config_path
self.section_name = section_name
self._read_config_file()
def _read_config_file(self) -> None:
self.config = configparser.ConfigParser(interpolation=None)
if self.config_path.exists():
self.config.read(self.config_path, encoding="utf-8")
else:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
if not self.config.has_section(self.section_name):
self.config.add_section(self.section_name)
def _write_config_file(self) -> None:
with self.config_path.open("w", encoding="utf-8") as config_file:
self.config.write(config_file)
def _serialize_param_default(self, param: click.Parameter) -> str:
if not isinstance(param.default, (list, tuple)):
param_default = [param.default]
else:
param_default = param.default
if not param_default:
return ""
first = param_default[0]
if isinstance(first, Enum):
return ",".join(str(item.value) for item in param_default)
if isinstance(first, bool):
return ",".join(str(item).lower() for item in param_default)
if first is None:
return "null"
return ",".join(str(item) for item in param_default)
def _add_param_default_to_config(
self,
param: click.Parameter,
) -> bool:
if self.config[self.section_name].get(param.name):
return False
value = self._serialize_param_default(param)
self.config[self.section_name][param.name] = value
return True
def _parse_param_from_config(
self,
param: click.Parameter,
) -> typing.Any:
value = self.config[self.section_name].get(param.name)
if value == "null":
return None
return param.type_cast_value(None, value)
def add_params_default_to_config(
self,
params: list[click.Parameter],
) -> None:
has_changes = False
for param in params:
has_changes = self._add_param_default_to_config(param) or has_changes
if has_changes:
self._write_config_file()
def parse_params_from_config(
self,
params: list[click.Parameter],
) -> dict[str, typing.Any]:
parsed_params = {}
for param in params:
parsed_params[param.name] = self._parse_param_from_config(param)
return parsed_params
-169
View File
@@ -1,169 +0,0 @@
STOREFRONT_IDS = {
"AE": "143481-2,32",
"AG": "143540-2,32",
"AI": "143538-2,32",
"AL": "143575-2,32",
"AM": "143524-2,32",
"AO": "143564-2,32",
"AR": "143505-28,32",
"AT": "143445-4,32",
"AU": "143460-27,32",
"AZ": "143568-2,32",
"BB": "143541-2,32",
"BE": "143446-2,32",
"BF": "143578-2,32",
"BG": "143526-2,32",
"BH": "143559-2,32",
"BJ": "143576-2,32",
"BM": "143542-2,32",
"BN": "143560-2,32",
"BO": "143556-28,32",
"BR": "143503-15,32",
"BS": "143539-2,32",
"BT": "143577-2,32",
"BW": "143525-2,32",
"BY": "143565-2,32",
"BZ": "143555-2,32",
"CA": "143455-6,32",
"CG": "143582-2,32",
"CH": "143459-57,32",
"CL": "143483-28,32",
"CN": "143465-19,32",
"CO": "143501-28,32",
"CR": "143495-28,32",
"CV": "143580-2,32",
"CY": "143557-2,32",
"CZ": "143489-2,32",
"DE": "143443-4,32",
"DK": "143458-2,32",
"DM": "143545-2,32",
"DO": "143508-28,32",
"DZ": "143563-2,32",
"EC": "143509-28,32",
"EE": "143518-2,32",
"EG": "143516-2,32",
"ES": "143454-8,32",
"FI": "143447-2,32",
"FJ": "143583-2,32",
"FM": "143591-2,32",
"FR": "143442-3,32",
"GB": "143444-2,32",
"GD": "143546-2,32",
"GH": "143573-2,32",
"GM": "143584-2,32",
"GR": "143448-2,32",
"GT": "143504-28,32",
"GW": "143585-2,32",
"GY": "143553-2,32",
"HK": "143463-45,32",
"HN": "143510-28,32",
"HR": "143494-2,32",
"HU": "143482-2,32",
"ID": "143476-2,32",
"IE": "143449-2,32",
"IL": "143491-2,32",
"IN": "143467-2,32",
"IS": "143558-2,32",
"IT": "143450-7,32",
"JM": "143511-2,32",
"JO": "143528-2,32",
"JP": "143462-9,32",
"KE": "143529-2,32",
"KG": "143586-2,32",
"KH": "143579-2,32",
"KN": "143548-2,32",
"KR": "143466-13,32",
"KW": "143493-2,32",
"KY": "143544-2,32",
"KZ": "143517-2,32",
"LA": "143587-2,32",
"LB": "143497-2,32",
"LC": "143549-2,32",
"LK": "143486-2,32",
"LR": "143588-2,32",
"LT": "143520-2,32",
"LU": "143451-2,32",
"LV": "143519-2,32",
"MD": "143523-2,32",
"MG": "143531-2,32",
"MK": "143530-2,32",
"ML": "143532-2,32",
"MN": "143592-2,32",
"MO": "143515-45,32",
"MR": "143590-2,32",
"MS": "143547-2,32",
"MT": "143521-2,32",
"MU": "143533-2,32",
"MW": "143589-2,32",
"MX": "143468-28,32",
"MY": "143473-2,32",
"MZ": "143593-2,32",
"NA": "143594-2,32",
"NE": "143534-2,32",
"NG": "143561-2,32",
"NI": "143512-28,32",
"NL": "143452-10,32",
"NO": "143457-2,32",
"NP": "143484-2,32",
"NZ": "143461-27,32",
"OM": "143562-2,32",
"PA": "143485-28,32",
"PE": "143507-28,32",
"PG": "143597-2,32",
"PH": "143474-2,32",
"PK": "143477-2,32",
"PL": "143478-2,32",
"PT": "143453-24,32",
"PW": "143595-2,32",
"PY": "143513-28,32",
"QA": "143498-2,32",
"RO": "143487-2,32",
"RU": "143469-16,32",
"SA": "143479-2,32",
"SB": "143601-2,32",
"SC": "143599-2,32",
"SE": "143456-17,32",
"SG": "143464-19,32",
"SI": "143499-2,32",
"SK": "143496-2,32",
"SL": "143600-2,32",
"SN": "143535-2,32",
"SR": "143554-2,32",
"ST": "143598-2,32",
"SV": "143506-28,32",
"SZ": "143602-2,32",
"TC": "143552-2,32",
"TD": "143581-2,32",
"TH": "143475-2,32",
"TJ": "143603-2,32",
"TM": "143604-2,32",
"TN": "143536-2,32",
"TR": "143480-2,32",
"TT": "143551-2,32",
"TW": "143470-18,32",
"TZ": "143572-2,32",
"UA": "143492-2,32",
"UG": "143537-2,32",
"US": "143441-1,32",
"UY": "143514-2,32",
"UZ": "143566-2,32",
"VC": "143550-2,32",
"VE": "143502-28,32",
"VG": "143543-2,32",
"VN": "143471-2,32",
"YE": "143571-2,32",
"ZA": "143472-2,32",
"ZW": "143605-2,32",
}
EXCLUDED_CONFIG_FILE_PARAMS = (
"urls",
"config_path",
"read_urls_as_txt",
"no_config_file",
"version",
"help",
)
X_NOT_FOUND_STRING = '{} not found at "{}"'
-24
View File
@@ -1,24 +0,0 @@
import logging
import colorama
from .utils import color_text
class CustomLoggerFormatter(logging.Formatter):
base_format = "[%(levelname)-8s %(asctime)s]"
format_colors = {
logging.DEBUG: colorama.Style.DIM,
logging.INFO: colorama.Fore.GREEN,
logging.WARNING: colorama.Fore.YELLOW,
logging.ERROR: colorama.Fore.RED,
logging.CRITICAL: colorama.Fore.RED,
}
date_format = "%H:%M:%S"
def format(self, record: logging.LogRecord) -> str:
return logging.Formatter(
color_text(self.base_format, self.format_colors.get(record.levelno))
+ " %(message)s",
datefmt=self.date_format,
).format(record)
-50
View File
@@ -1,50 +0,0 @@
import sqlite3
from pathlib import Path
class Database:
INITIAL_QUERY = """
CREATE TABLE IF NOT EXISTS media (
media_id TEXT PRIMARY KEY,
media_path TEXT NOT NULL
)
"""
ADD_MEDIA_QUERY = """
INSERT OR REPLACE INTO media (media_id, media_path) VALUES (?, ?)
"""
GET_MEDIA_QUERY = """
SELECT media_path FROM media WHERE media_id = ?
"""
def __init__(self, file_path: Path):
self.file_path = file_path
self._initialize_db()
def _initialize_db(self):
self.file_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.file_path) as conn:
conn.execute(self.INITIAL_QUERY)
conn.commit()
def add_media(self, media_id: str, media_path: Path):
with sqlite3.connect(self.file_path) as conn:
conn.execute(
self.ADD_MEDIA_QUERY,
(
media_id,
str(media_path.absolute()),
),
)
conn.commit()
def get_media(self, media_id: str) -> Path | None:
with sqlite3.connect(self.file_path) as conn:
cursor = conn.execute(
self.GET_MEDIA_QUERY,
(media_id,),
)
result = cursor.fetchone()
if result:
return Path(result[0])
return None
-818
View File
@@ -1,818 +0,0 @@
from __future__ import annotations
import base64
import datetime
import functools
import io
import logging
import re
import shutil
import subprocess
import typing
import urllib.parse
import uuid
from pathlib import Path
import colorama
import requests
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from mutagen.mp4 import MP4, MP4Cover
from PIL import Image
from pywidevine import PSSH, Cdm, Device
from yt_dlp import YoutubeDL
from .apple_music_api import AppleMusicApi
from .database import Database
from .enums import CoverFormat, DownloadMode, MediaFileFormat, RemuxMode
from .hardcoded_wvd import HARDCODED_WVD
from .itunes_api import ItunesApi
from .models import (
DecryptionKey,
DownloadInfo,
DownloadQueue,
MediaTags,
PlaylistTags,
UrlInfo,
)
from .utils import color_text, raise_response_exception
logger = logging.getLogger("gamdl")
class Downloader:
ILLEGAL_CHARS_RE = r'[\\/:*?"<>|;]'
ILLEGAL_CHAR_REPLACEMENT = "_"
VALID_URL_RE = (
r"("
r"/(?P<storefront>[a-z]{2})"
r"/(?P<type>artist|album|playlist|song|music-video|post)"
r"(?:/(?P<slug>[^\s/]+))?"
r"/(?P<id>[0-9]+|pl\.[0-9a-z]{32}|pl\.u-[a-zA-Z0-9]+)"
r"(?:\?i=(?P<sub_id>[0-9]+))?"
r")|("
r"(?:/(?P<library_storefront>[a-z]{2}))?"
r"/library/(?P<library_type>|playlist|albums)"
r"/(?P<library_id>p\.[a-zA-Z0-9]{15}|l\.[a-zA-Z0-9]{7})"
r")"
)
IMAGE_FILE_EXTENSION_MAP = {
"jpeg": ".jpg",
"tiff": ".tif",
}
def __init__(
self,
apple_music_api: AppleMusicApi,
itunes_api: ItunesApi,
output_path: Path = Path("./Apple Music"),
temp_path: Path = Path("."),
wvd_path: Path = None,
overwrite: bool = False,
save_cover: bool = False,
save_playlist: bool = False,
no_synced_lyrics: bool = False,
synced_lyrics_only: bool = False,
nm3u8dlre_path: str = "N_m3u8DL-RE",
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
download_mode: DownloadMode = DownloadMode.YTDLP,
remux_mode: RemuxMode = RemuxMode.FFMPEG,
cover_format: CoverFormat = CoverFormat.JPG,
template_folder_album: str = "{album_artist}/{album}",
template_folder_compilation: str = "Compilations/{album}",
template_file_single_disc: str = "{track:02d} {title}",
template_file_multi_disc: str = "{disc}-{track:02d} {title}",
template_folder_no_album: str = "{artist}/Unknown Album",
template_file_no_album: str = "{title}",
template_file_playlist: str = "Playlists/{playlist_artist}/{playlist_title}",
template_date: str = "%Y-%m-%dT%H:%M:%SZ",
exclude_tags: list[str] = None,
cover_size: int = 1200,
truncate: int = None,
database_path: Path = None,
silent: bool = False,
skip_processing: bool = False,
):
self.apple_music_api = apple_music_api
self.itunes_api = itunes_api
self.output_path = output_path
self.temp_path = temp_path
self.wvd_path = wvd_path
self.overwrite = overwrite
self.save_cover = save_cover
self.save_playlist = save_playlist
self.no_synced_lyrics = no_synced_lyrics
self.synced_lyrics_only = synced_lyrics_only
self.nm3u8dlre_path = nm3u8dlre_path
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.download_mode = download_mode
self.remux_mode = remux_mode
self.cover_format = cover_format
self.template_folder_album = template_folder_album
self.template_folder_compilation = template_folder_compilation
self.template_file_single_disc = template_file_single_disc
self.template_file_multi_disc = template_file_multi_disc
self.template_folder_no_album = template_folder_no_album
self.template_file_no_album = template_file_no_album
self.template_file_playlist = template_file_playlist
self.template_date = template_date
self.exclude_tags = exclude_tags
self.cover_size = cover_size
self.truncate = truncate
self.database_path = database_path
self.silent = silent
self.skip_processing = skip_processing
self._set_temp_path()
self._set_exclude_tags()
self._set_binaries_path_full()
self._set_truncate()
self._set_database()
self._set_subprocess_additional_args()
def _set_temp_path(self):
random_suffix = uuid.uuid4().hex[:8]
self.temp_path_generated = self.temp_path / f"gamdl_temp_{random_suffix}"
def _set_exclude_tags(self):
self.exclude_tags = self.exclude_tags if self.exclude_tags is not None else []
def _set_binaries_path_full(self):
self.nm3u8dlre_path_full = shutil.which(self.nm3u8dlre_path)
self.ffmpeg_path_full = shutil.which(self.ffmpeg_path)
self.mp4box_path_full = shutil.which(self.mp4box_path)
self.mp4decrypt_path_full = shutil.which(self.mp4decrypt_path)
def _set_truncate(self):
if self.truncate is not None:
self.truncate = None if self.truncate < 4 else self.truncate
def _set_database(self):
if self.database_path is not None:
self.database = Database(self.database_path)
else:
self.database = None
def _set_subprocess_additional_args(self):
if self.silent:
self.subprocess_additional_args = {
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
}
else:
self.subprocess_additional_args = {}
def set_cdm(self):
if self.wvd_path:
self.cdm = Cdm.from_device(Device.load(self.wvd_path))
else:
self.cdm = Cdm.from_device(Device.loads(HARDCODED_WVD))
def parse_url_info(self, url: str) -> UrlInfo | None:
url = urllib.parse.unquote(url)
url_regex_result = re.search(
self.VALID_URL_RE,
url,
)
if not url_regex_result:
return None
return UrlInfo(
**url_regex_result.groupdict(),
)
def get_download_queue(self, url_info: UrlInfo) -> DownloadQueue:
return self._get_download_queue(
"song" if url_info.sub_id else url_info.type,
url_info.sub_id or url_info.id or url_info.library_id,
url_info.library_id is not None,
)
def _get_download_queue(
self,
url_type: str,
id: str,
is_library: bool,
) -> DownloadQueue | None:
download_queue = DownloadQueue()
if url_type == "artist":
artist = self.apple_music_api.get_artist(id)
if artist is None:
return None
download_queue.medias_metadata = list(
self.get_download_queue_from_artist(artist)
)
if url_type == "song":
song = self.apple_music_api.get_song(id)
if song is None:
return None
download_queue.medias_metadata = [song]
if url_type in {"album", "albums"}:
if is_library:
album = self.apple_music_api.get_library_album(id)
else:
album = self.apple_music_api.get_album(id)
if album is None:
return None
download_queue.medias_metadata = [
track for track in album["relationships"]["tracks"]["data"]
]
if url_type == "playlist":
if is_library:
playlist = self.apple_music_api.get_library_playlist(id)
else:
playlist = self.apple_music_api.get_playlist(id)
if playlist is None:
return None
download_queue.medias_metadata = [
track for track in playlist["relationships"]["tracks"]["data"]
]
download_queue.playlist_attributes = playlist["attributes"]
if url_type == "music-video":
music_video = self.apple_music_api.get_music_video(id)
if music_video is None:
return None
download_queue.medias_metadata = [music_video]
if url_type == "post":
post = self.apple_music_api.get_post(id)
if post is None:
return None
download_queue.medias_metadata = [post]
return download_queue
def get_download_queue_from_artist(
self,
artist: dict,
) -> typing.Generator[dict, None, None]:
media_type = inquirer.select(
message=f'Select which type to download for artist "{artist["attributes"]["name"]}":',
choices=[
Choice(name="Albums", value="albums"),
Choice(
name="Music Videos",
value="music-videos",
),
],
validate=lambda result: artist["relationships"].get(result, {}).get("data"),
invalid_message="The artist doesn't have any items of this type",
).execute()
if media_type == "albums":
yield from self.select_albums_from_artist(
artist["relationships"]["albums"]["data"]
)
elif media_type == "music-videos":
yield from self.select_music_videos_from_artist(
artist["relationships"]["music-videos"]["data"]
)
def select_albums_from_artist(
self,
albums: list[dict],
) -> typing.Generator[dict, None, None]:
choices = [
Choice(
name=" | ".join(
[
f'{album["attributes"]["trackCount"]:03d}',
f'{album["attributes"]["releaseDate"]:<10}',
f'{album["attributes"].get("contentRating", "None").title():<8}',
f'{album["attributes"]["name"]}',
]
),
value=album,
)
for album in albums
]
selected = inquirer.select(
message="Select which albums to download: (Track Count | Release Date | Rating | Title)",
choices=choices,
multiselect=True,
).execute()
for album in selected:
for track in self.apple_music_api.get_album(album["id"])["relationships"][
"tracks"
]["data"]:
yield track
def select_music_videos_from_artist(
self,
music_videos: list[dict],
) -> typing.Generator[dict, None, None]:
choices = [
Choice(
name=" | ".join(
[
self.millis_to_min_sec(
music_video["attributes"]["durationInMillis"]
),
f'{music_video["attributes"].get("contentRating", "None").title():<8}',
music_video["attributes"]["name"],
],
),
value=music_video,
)
for music_video in music_videos
]
selected = inquirer.select(
message="Select which music videos to download: (Duration | Rating | Title)",
choices=choices,
multiselect=True,
).execute()
for music_video in selected:
yield music_video
def get_media_id_of_library_media(
self,
library_media_metadata: dict,
) -> str:
play_params = library_media_metadata["attributes"].get("playParams", {})
return play_params.get("catalogId", library_media_metadata["id"])
def is_media_streamable(
self,
media_metadata: dict,
) -> bool:
return bool(media_metadata["attributes"].get("playParams"))
def get_database_final_path(self, media_id: str) -> Path | None:
if self.database is None:
return
final_path_database = self.database.get_media(media_id)
if (
final_path_database is not None
and final_path_database.exists()
and not self.overwrite
):
return final_path_database
def get_playlist_tags(
self,
playlist_attributes: dict,
playlist_track: int,
) -> PlaylistTags:
return PlaylistTags(
playlist_artist=playlist_attributes.get("curatorName", "Unknown"),
playlist_id=playlist_attributes["playParams"]["id"],
playlist_title=playlist_attributes["name"],
playlist_track=playlist_track,
)
def get_playlist_file_path(
self,
tags: PlaylistTags,
) -> Path:
template_file = self.template_file_playlist.split("/")
tags_dict = tags.__dict__.copy()
return Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_file[0:-1]
],
*[
self.get_sanitized_string(template_file[-1].format(**tags_dict), False)
+ ".m3u8"
],
)
def update_playlist_file(
self,
playlist_file_path: Path,
final_path: Path,
playlist_track: int,
):
playlist_file_path.parent.mkdir(parents=True, exist_ok=True)
playlist_file_path_parent_parts_len = len(playlist_file_path.parent.parts)
output_path_parts_len = len(self.output_path.parts)
final_path_relative = Path(
("../" * (playlist_file_path_parent_parts_len - output_path_parts_len)),
*final_path.parts[output_path_parts_len:],
)
playlist_file_lines = (
playlist_file_path.open("r", encoding="utf8").readlines()
if playlist_file_path.exists()
else []
)
if len(playlist_file_lines) < playlist_track:
playlist_file_lines.extend(
"\n" for _ in range(playlist_track - len(playlist_file_lines))
)
playlist_file_lines[playlist_track - 1] = final_path_relative.as_posix() + "\n"
with playlist_file_path.open("w", encoding="utf8") as playlist_file:
playlist_file.writelines(playlist_file_lines)
@staticmethod
def millis_to_min_sec(millis) -> str:
minutes, seconds = divmod(millis // 1000, 60)
return f"{minutes:02d}:{seconds:02d}"
def parse_date(self, date: str) -> datetime.datetime:
return datetime.datetime.fromisoformat(date.split("Z")[0])
def get_decryption_key(self, pssh: str, track_id: str) -> DecryptionKey:
try:
cdm_session = self.cdm.open()
pssh_obj = PSSH(pssh.split(",")[-1])
challenge = base64.b64encode(
self.cdm.get_license_challenge(cdm_session, pssh_obj)
).decode()
license = self.apple_music_api.get_widevine_license(
track_id,
pssh,
challenge,
)
self.cdm.parse_license(cdm_session, license)
decryption_key_info = next(
i for i in self.cdm.get_keys(cdm_session) if i.type == "CONTENT"
)
finally:
self.cdm.close(cdm_session)
return DecryptionKey(
key=decryption_key_info.key.hex(),
kid=decryption_key_info.kid.hex,
)
def download(self, path: Path, stream_url: str):
if self.download_mode == DownloadMode.YTDLP:
self.download_ytdlp(path, stream_url)
elif self.download_mode == DownloadMode.NM3U8DLRE:
self.download_nm3u8dlre(path, stream_url)
def download_ytdlp(self, path: Path, stream_url: str):
with YoutubeDL(
{
"quiet": True,
"no_warnings": True,
"outtmpl": str(path),
"allow_unplayable_formats": True,
"fixup": "never",
"allowed_extractors": ["generic"],
"noprogress": self.silent,
}
) as ydl:
ydl.download(stream_url)
def download_nm3u8dlre(self, path: Path, stream_url: str):
path.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
[
self.nm3u8dlre_path_full,
stream_url,
"--binary-merge",
"--no-log",
"--log-level",
"off",
"--ffmpeg-binary-path",
self.ffmpeg_path_full,
"--save-name",
path.stem,
"--save-dir",
path.parent,
"--tmp-dir",
path.parent,
],
check=True,
**self.subprocess_additional_args,
)
def get_sanitized_string(self, dirty_string: str, is_folder: bool) -> str:
dirty_string = re.sub(
self.ILLEGAL_CHARS_RE,
self.ILLEGAL_CHAR_REPLACEMENT,
dirty_string,
)
if is_folder:
dirty_string = dirty_string[: self.truncate]
if dirty_string.endswith("."):
dirty_string = dirty_string[:-1] + self.ILLEGAL_CHAR_REPLACEMENT
else:
if self.truncate is not None:
dirty_string = dirty_string[: self.truncate - 4]
return dirty_string.strip()
def get_media_file_extension(
self,
media_file_format: MediaFileFormat,
) -> str:
return "." + media_file_format.value
def get_temp_path(
self,
media_id: str,
tag: str,
file_extension: str,
):
temp_path = self.temp_path_generated / (f"{media_id}_{tag}" + file_extension)
return temp_path
def get_final_path(
self,
tags: MediaTags,
file_extension: str,
playlist_tags: PlaylistTags,
) -> Path:
if tags.album is not None:
template_folder = (
self.template_folder_compilation.split("/")
if tags.compilation
else self.template_folder_album.split("/")
)
template_file = (
self.template_file_multi_disc.split("/")
if tags.disc_total > 1
else self.template_file_single_disc.split("/")
)
else:
template_folder = self.template_folder_no_album.split("/")
template_file = self.template_file_no_album.split("/")
template_final = template_folder + template_file
tags_dict = tags.__dict__.copy()
if playlist_tags:
tags_dict.update(playlist_tags.__dict__)
return Path(
self.output_path,
*[
self.get_sanitized_string(i.format(**tags_dict), True)
for i in template_final[0:-1]
],
(
self.get_sanitized_string(template_final[-1].format(**tags_dict), False)
+ file_extension
),
)
def get_cover_format(self, cover_url: str) -> str | None:
cover_bytes = self.get_cover_bytes(cover_url)
if cover_bytes is None:
return None
image_obj = Image.open(io.BytesIO(self.get_cover_bytes(cover_url)))
image_format = image_obj.format.lower()
return image_format
def get_cover_file_extension(self, cover_format: str) -> str:
return self.IMAGE_FILE_EXTENSION_MAP.get(
cover_format,
f".{cover_format.lower()}",
)
def get_cover_url(self, metadata: dict) -> str:
if self.cover_format == CoverFormat.RAW:
return self._get_raw_cover_url(metadata["attributes"]["artwork"]["url"])
return self._get_cover_url(metadata["attributes"]["artwork"]["url"])
def _get_raw_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"image/thumb/",
"",
re.sub(
r"is1-ssl",
"a1",
re.sub(
r"/\{w\}x\{h\}([a-z]{2})\.jpg",
"",
cover_url_template,
),
),
)
def _get_cover_url(self, cover_url_template: str) -> str:
return re.sub(
r"\{w\}x\{h\}([a-z]{2})\.jpg",
f"{self.cover_size}x{self.cover_size}bb.{self.cover_format.value}",
cover_url_template,
)
@staticmethod
@functools.lru_cache()
def get_cover_bytes(url: str) -> bytes | None:
response = requests.get(url)
if response.status_code == 200:
return response.content
elif response.status_code in (404, 400):
return None
else:
raise_response_exception(response)
return response.content
def apply_tags(
self,
path: Path,
tags: MediaTags,
cover_url: str,
):
filtered_tags = MediaTags(
**{
k: v
for k, v in tags.__dict__.items()
if v is not None and k not in self.exclude_tags
}
)
mp4_tags = filtered_tags.to_mp4_tags(self.template_date)
skip_tagging = "all" in self.exclude_tags
mp4 = MP4(path)
mp4.clear()
if not skip_tagging:
if (
"cover" not in self.exclude_tags
and self.cover_format != CoverFormat.RAW
):
self._apply_cover(mp4, cover_url)
mp4.update(mp4_tags)
mp4.save()
def _apply_cover(
self,
mp4: MP4,
cover_url: str,
) -> None:
cover_bytes = self.get_cover_bytes(cover_url)
if cover_bytes is None:
return
mp4["covr"] = [
MP4Cover(
data=cover_bytes,
imageformat=(
MP4Cover.FORMAT_JPEG
if self.cover_format == CoverFormat.JPG
else MP4Cover.FORMAT_PNG
),
)
]
def move_to_output_path(
self,
staged_path: Path,
final_path: Path,
):
final_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(staged_path, final_path)
@functools.lru_cache()
def write_cover(self, cover_path: Path, cover_url: str):
cover_path.parent.mkdir(parents=True, exist_ok=True)
cover_path.write_bytes(self.get_cover_bytes(cover_url))
def write_synced_lyrics(
self,
synced_lyrics_path: Path,
synced_lyrics: str,
):
synced_lyrics_path.parent.mkdir(parents=True, exist_ok=True)
synced_lyrics_path.write_text(
synced_lyrics,
encoding="utf8",
)
def cleanup_temp_path(self) -> None:
if self.temp_path_generated.exists():
shutil.rmtree(self.temp_path_generated)
def _final_processing_wrapper(
self,
func,
*args,
**kwargs,
) -> typing.Generator[DownloadInfo, None, None]:
exception = None
download_info = None
try:
for download_info in func(*args, **kwargs):
yield download_info
except Exception as e:
exception = e
finally:
if download_info is not None and isinstance(download_info, DownloadInfo):
self._final_processing(
download_info,
)
if exception is not None:
raise exception
def _final_processing(
self,
download_info: DownloadInfo,
) -> None:
if self.skip_processing:
return
if download_info.media_id:
colored_media_id = color_text(
download_info.media_id,
colorama.Style.DIM,
)
else:
colored_media_id = color_text(
"Unknown",
colorama.Style.DIM,
)
if download_info.staged_path:
logger.debug(
f'[{colored_media_id}] Applying tags to "{download_info.staged_path}"'
)
self.apply_tags(
download_info.staged_path,
download_info.tags,
download_info.cover_url,
)
logger.debug(
f'[{colored_media_id}] Moving "{download_info.staged_path}" to "{download_info.final_path}"'
)
self.move_to_output_path(
download_info.staged_path,
download_info.final_path,
)
logger.info(f"[{colored_media_id}] Download completed successfully")
if self.database is not None:
logger.debug(
f'[{colored_media_id}] Adding entry to database at "{self.database_path}"'
)
self.database.add_media(
download_info.media_id,
download_info.final_path,
)
if (
download_info.cover_path and not self.save_cover
) or not download_info.cover_path:
pass
elif download_info.cover_path.exists() and not self.overwrite:
logger.debug(
f'[{colored_media_id}] Cover already exists at "{download_info.cover_path}", skipping'
)
else:
logger.debug(
f'[{colored_media_id}] Saving cover to "{download_info.cover_path}"'
)
self.write_cover(
download_info.cover_path,
download_info.cover_url,
)
if (
self.no_synced_lyrics
or not download_info.lyrics
or not download_info.lyrics.synced
):
pass
elif download_info.synced_lyrics_path.exists() and not self.overwrite:
logger.debug(
f'[{colored_media_id}] Synced lyrics already exist at "{download_info.synced_lyrics_path}", skipping'
)
else:
logger.debug(
f'[{colored_media_id}] Saving synced lyrics to "{download_info.synced_lyrics_path}"'
)
self.write_synced_lyrics(
download_info.synced_lyrics_path,
download_info.lyrics.synced,
)
if download_info.playlist_tags and self.save_playlist:
playlist_file_path = self.get_playlist_file_path(
download_info.playlist_tags
)
logger.debug(
f'[{colored_media_id}] Updating playlist file "{playlist_file_path}"'
)
self.update_playlist_file(
playlist_file_path,
download_info.final_path,
download_info.playlist_tags.playlist_track,
)
self.cleanup_temp_path()
-615
View File
@@ -1,615 +0,0 @@
from __future__ import annotations
import logging
import subprocess
import urllib.parse
from pathlib import Path
import colorama
import m3u8
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from .downloader import Downloader
from .enums import (
MediaFileFormat,
MusicVideoCodec,
MusicVideoResolution,
RemuxFormatMusicVideo,
RemuxMode,
)
from .exceptions import *
from .models import (
DecryptionKeyAv,
DownloadInfo,
MediaRating,
MediaTags,
MediaType,
StreamInfo,
StreamInfoAv,
)
from .utils import color_text
logger = logging.getLogger("gamdl")
class DownloaderMusicVideo:
MP4_FORMAT_CODECS = ["hvc1", "audio-atmos", "audio-ec3"]
def __init__(
self,
downloader: Downloader,
codec: list[MusicVideoCodec] = [MusicVideoCodec.H264, MusicVideoCodec.H265],
remux_format: RemuxFormatMusicVideo = RemuxFormatMusicVideo.M4V,
resolution: MusicVideoResolution = MusicVideoResolution.R1080P,
) -> None:
self.downloader = downloader
self.codec = codec
self.remux_format = remux_format
self.resolution = resolution
def get_stream_url_from_webplayback(self, webplayback: dict) -> str:
return webplayback["hls-playlist-url"]
def get_stream_url_from_itunes_page(self, itunes_page: dict) -> dict:
stream_url = itunes_page["offers"][0]["assets"][0]["hlsUrl"]
url_parts = urllib.parse.urlparse(stream_url)
query = urllib.parse.parse_qs(url_parts.query, keep_blank_values=True)
query.update({"aec": "HD", "dsid": "1"})
return url_parts._replace(
query=urllib.parse.urlencode(query, doseq=True)
).geturl()
def get_video_playlist_from_resolution(
self,
playlists: list[m3u8.Playlist],
) -> m3u8.Playlist | None:
playlists_filtered = set()
for playlist in playlists:
for codec in self.codec:
if playlist.stream_info.codecs.startswith(codec.fourcc()):
playlists_filtered.add(playlist)
if not playlists_filtered:
return None
playlists_filtered = list(playlists_filtered)
def sort_key(playlist: m3u8.Playlist) -> tuple[int, int, int, int]:
playlist_resolution = playlist.stream_info.resolution[-1]
resolution_difference = abs(playlist_resolution - int(self.resolution))
codec_preference = len(self.codec)
for i, preferred_codec in enumerate(self.codec):
if playlist.stream_info.codecs.startswith(preferred_codec.fourcc()):
codec_preference = i
break
bandwidth = playlist.stream_info.bandwidth
return (
resolution_difference,
codec_preference,
-playlist_resolution,
-bandwidth,
)
playlists_filtered.sort(key=sort_key)
return playlists_filtered[0]
def get_best_stereo_audio_playlist(
self,
playlist_master_data: dict,
) -> dict | None:
audio_playlist = next(
(
media
for media in playlist_master_data["media"]
if media["group_id"] == "audio-stereo-256"
),
None,
)
return audio_playlist
def get_video_playlist_from_user(
self,
playlists: list[m3u8.Playlist],
) -> m3u8.Playlist:
choices = [
Choice(
name=" | ".join(
[
playlist.stream_info.codecs[:4],
"x".join(str(v) for v in playlist.stream_info.resolution),
str(playlist.stream_info.bandwidth),
]
),
value=playlist,
)
for playlist in playlists
]
selected = inquirer.select(
message="Select which video codec to download: (Codec | Resolution | Bitrate)",
choices=choices,
).execute()
return selected
def get_audio_playlist_from_user(
self,
playlist_master_data: dict,
) -> dict:
choices = [
Choice(
name=playlist["group_id"],
value=playlist,
)
for playlist in playlist_master_data["media"]
if playlist.get("uri")
]
selected = inquirer.select(
message="Select which audio codec to download:",
choices=choices,
).execute()
return selected
def get_pssh(self, m3u8_obj: m3u8.M3U8) -> str:
return next(
(
key
for key in m3u8_obj.keys
if key.keyformat == "urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
),
None,
).uri
def get_stream_info_video(
self, playlist_master_m3u8_obj: m3u8.M3U8
) -> StreamInfo | None:
stream_info = StreamInfo()
if MusicVideoCodec.ASK not in self.codec:
playlist = self.get_video_playlist_from_resolution(
playlist_master_m3u8_obj.playlists
)
else:
playlist = self.get_video_playlist_from_user(
playlist_master_m3u8_obj.playlists
)
if not playlist:
return None
stream_info.stream_url = playlist.uri
stream_info.codec = playlist.stream_info.codecs
playlist_m3u8_obj = m3u8.load(stream_info.stream_url)
stream_info.widevine_pssh = self.get_pssh(playlist_m3u8_obj)
return stream_info
def get_stream_info_audio(self, playlist_master_data: dict) -> StreamInfo | None:
stream_info = StreamInfo()
if self.codec != MusicVideoCodec.ASK:
playlist = self.get_best_stereo_audio_playlist(playlist_master_data)
else:
playlist = self.get_audio_playlist_from_user(playlist_master_data)
if not playlist:
return None
stream_info.stream_url = playlist["uri"]
stream_info.codec = playlist["group_id"]
playlist_m3u8_obj = m3u8.load(stream_info.stream_url)
stream_info.widevine_pssh = self.get_pssh(playlist_m3u8_obj)
return stream_info
def _get_stream_info(
self,
stream_url: str,
) -> StreamInfoAv | None:
playlist_master_m3u8_obj = m3u8.load(stream_url)
stream_info_video = self.get_stream_info_video(playlist_master_m3u8_obj)
stream_info_audio = self.get_stream_info_audio(playlist_master_m3u8_obj.data)
if not stream_info_video or not stream_info_audio:
return None
use_mp4 = (
any(
stream_info_video.codec.startswith(codec)
for codec in self.MP4_FORMAT_CODECS
)
or any(
stream_info_audio.codec.startswith(codec)
for codec in self.MP4_FORMAT_CODECS
)
or self.remux_format == RemuxFormatMusicVideo.MP4
)
if use_mp4:
file_format = MediaFileFormat.MP4
else:
file_format = MediaFileFormat.M4V
return StreamInfoAv(
video_track=stream_info_video,
audio_track=stream_info_audio,
file_format=file_format,
)
def get_stream_info_from_webplayback(
self,
webplayback: dict,
) -> StreamInfoAv | None:
return self._get_stream_info(self.get_stream_url_from_webplayback(webplayback))
def get_stream_info_from_itunes_page(
self,
itunes_page: dict,
) -> StreamInfoAv | None:
return self._get_stream_info(self.get_stream_url_from_itunes_page(itunes_page))
def get_decryption_key(
self,
stream_info: StreamInfoAv,
media_id: str,
) -> DecryptionKeyAv:
decryption_key_video = self.downloader.get_decryption_key(
stream_info.video_track.widevine_pssh,
media_id,
)
decryption_key_audio = self.downloader.get_decryption_key(
stream_info.audio_track.widevine_pssh,
media_id,
)
return DecryptionKeyAv(
video_track=decryption_key_video,
audio_track=decryption_key_audio,
)
def get_music_video_id_alt(self, metadata: dict) -> str | None:
music_video_url = metadata["attributes"].get("url")
if music_video_url is None:
return None
return music_video_url.split("/")[-1].split("?")[0]
def get_tags(
self,
id_alt: str,
itunes_page: dict,
metadata: dict,
) -> MediaTags:
metadata_itunes = self.downloader.itunes_api.get_resource(id_alt)
explicitness = metadata_itunes[0]["trackExplicitness"]
if explicitness == "notExplicit":
rating = MediaRating.NONE
elif explicitness == "explicit":
rating = MediaRating.EXPLICIT
else:
rating = MediaRating.CLEAN
tags = MediaTags(
artist=metadata_itunes[0]["artistName"],
artist_id=int(metadata_itunes[0]["artistId"]),
copyright=itunes_page.get("copyright"),
date=self.downloader.parse_date(metadata_itunes[0]["releaseDate"]),
genre=metadata_itunes[0]["primaryGenreName"],
genre_id=int(itunes_page["genres"][0]["genreId"]),
media_type=MediaType.MUSIC_VIDEO,
storefront=int(self.downloader.itunes_api.storefront_id.split("-")[0]),
title=metadata_itunes[0]["trackCensoredName"],
title_id=int(metadata["id"]),
rating=rating,
)
if len(metadata_itunes) > 1:
album = self.downloader.apple_music_api.get_album(
itunes_page["collectionId"]
)
if not album:
return tags
tags.album = metadata_itunes[1]["collectionCensoredName"]
tags.album_artist = metadata_itunes[1]["artistName"]
tags.album_id = int(itunes_page["collectionId"])
tags.disc = metadata_itunes[0]["discNumber"]
tags.disc_total = metadata_itunes[0]["discCount"]
tags.compilation = album["attributes"]["isCompilation"]
tags.track = metadata_itunes[0]["trackNumber"]
tags.track_total = metadata_itunes[0]["trackCount"]
return tags
def decrypt(
self,
encrypted_path: Path,
decryption_key: str,
decrypted_path: Path,
) -> None:
subprocess.run(
[
self.downloader.mp4decrypt_path_full,
encrypted_path,
"--key",
f"1:{decryption_key}",
decrypted_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def remux_mp4box(
self,
decrypted_path_audio: Path,
decrypted_path_video: Path,
fixed_path: Path,
) -> None:
subprocess.run(
[
self.downloader.mp4box_path_full,
"-quiet",
"-add",
decrypted_path_audio,
"-add",
decrypted_path_video,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
fixed_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def remux_ffmpeg(
self,
decrypted_path_video: Path,
decrypte_path_audio: Path,
fixed_path: Path,
) -> None:
subprocess.run(
[
self.downloader.ffmpeg_path_full,
"-loglevel",
"error",
"-y",
"-i",
decrypted_path_video,
"-i",
decrypte_path_audio,
"-movflags",
"+faststart",
"-c",
"copy",
"-c:s",
"mov_text",
fixed_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def stage(
self,
encrypted_path_video: Path,
encrypted_path_audio: Path,
decrypted_path_video: Path,
decrypted_path_audio: Path,
staged_path: Path,
decryption_key: DecryptionKeyAv,
) -> None:
self.decrypt(
encrypted_path_video,
decryption_key.video_track.key,
decrypted_path_video,
)
self.decrypt(
encrypted_path_audio,
decryption_key.audio_track.key,
decrypted_path_audio,
)
if self.downloader.remux_mode == RemuxMode.MP4BOX:
self.remux_mp4box(
decrypted_path_audio,
decrypted_path_video,
staged_path,
)
elif self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(
decrypted_path_video,
decrypted_path_audio,
staged_path,
)
def get_cover_path(self, final_path: Path, cover_format: str) -> Path:
return final_path.with_suffix(
self.downloader.get_cover_file_extension(cover_format)
)
import typing
def download(
self,
media_id: str = None,
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
def _download(
self,
media_id: str = None,
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if playlist_track is None and playlist_attributes:
raise ValueError(
"playlist_track must be provided if playlist_attributes is provided"
)
if playlist_attributes:
playlist_tags = self.downloader.get_playlist_tags(
playlist_attributes,
playlist_track,
)
else:
playlist_tags = None
download_info.playlist_tags = playlist_tags
if not media_id and not media_metadata:
raise ValueError("Either media_id or media_metadata must be provided")
if media_metadata:
media_id = self.downloader.get_media_id_of_library_media(media_metadata)
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Music Video metadata")
media_metadata = self.downloader.apple_music_api.get_music_video(media_id)
download_info.media_metadata = media_metadata
if not self.downloader.is_media_streamable(media_metadata):
yield download_info
raise MediaNotStreamableException()
alt_media_id = self.get_music_video_id_alt(media_metadata) or media_id
download_info.alt_media_id = alt_media_id
logger.debug(f"[{colored_media_id}] Getting iTunes page")
itunes_page = self.downloader.itunes_api.get_itunes_page(
"music-video",
alt_media_id,
)
logger.debug(f"[{colored_media_id}] Getting tags")
tags = self.get_tags(
alt_media_id,
itunes_page,
media_metadata,
)
download_info.tags = tags
if alt_media_id == media_id:
logger.debug(f"[{colored_media_id}] Getting stream info")
stream_info = self.get_stream_info_from_itunes_page(itunes_page)
else:
logger.debug(f"[{colored_media_id}] Getting webplayback info")
webplayback = self.downloader.apple_music_api.get_webplayback(media_id)
logger.debug(f"[{colored_media_id}] Getting stream info")
stream_info = self.get_stream_info_from_webplayback(webplayback)
if not stream_info:
yield download_info
raise MediaFormatNotAvailableException()
download_info.stream_info = stream_info
final_path = self.downloader.get_final_path(
tags,
self.downloader.get_media_file_extension(stream_info.file_format),
playlist_tags,
)
download_info.final_path = final_path
cover_url = self.downloader.get_cover_url(media_metadata)
cover_format = self.downloader.get_cover_format(cover_url)
if cover_format and self.downloader.save_cover:
cover_path = self.get_cover_path(final_path, cover_format)
else:
cover_path = None
download_info.cover_url = cover_url
download_info.cover_format = cover_format
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key(
stream_info,
media_id,
)
encrypted_path_video = self.downloader.get_temp_path(
media_id,
"encrypted_video",
".mp4",
)
encrypted_path_audio = self.downloader.get_temp_path(
media_id,
"encrypted_audio",
".m4a",
)
decrypted_path_video = self.downloader.get_temp_path(
media_id,
"decrypted_video",
".mp4",
)
decrypted_path_audio = self.downloader.get_temp_path(
media_id,
"decrypted_audio",
".m4a",
)
staged_path = self.downloader.get_temp_path(
media_id,
"staged",
self.downloader.get_media_file_extension(stream_info.file_format),
)
logger.info(f"[{colored_media_id}] Downloading Music Video")
logger.debug(
f'[{colored_media_id}] Downloading video to "{encrypted_path_video}"'
)
self.downloader.download(
encrypted_path_video,
stream_info.video_track.stream_url,
)
logger.debug(
f'[{colored_media_id}] Downloading audio to "{encrypted_path_audio}"'
)
self.downloader.download(
encrypted_path_audio,
stream_info.audio_track.stream_url,
)
logger.debug(
f"[{colored_media_id}] "
"Decrypting video/audio to "
f'{decrypted_path_video}"/"{decrypted_path_audio}" '
f'and remuxing to "{staged_path}"'
)
self.stage(
encrypted_path_video,
encrypted_path_audio,
decrypted_path_video,
decrypted_path_audio,
staged_path,
decryption_key,
)
download_info.staged_path = staged_path
yield download_info
-168
View File
@@ -1,168 +0,0 @@
from __future__ import annotations
import logging
import typing
from pathlib import Path
import colorama
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from .downloader import Downloader
from .enums import PostQuality
from .exceptions import MediaFileAlreadyExistsException, MediaNotStreamableException
from .models import DownloadInfo, MediaTags
from .utils import color_text
logger = logging.getLogger("gamdl")
class DownloaderPost:
QUALITY_RANK = [
"1080pHdVideo",
"720pHdVideo",
"sdVideoWithPlusAudio",
"sdVideo",
"sd480pVideo",
"provisionalUploadVideo",
]
def __init__(
self,
downloader: Downloader,
quality: PostQuality = PostQuality.BEST,
):
self.downloader = downloader
self.quality = quality
def get_stream_url_best(self, metadata: dict) -> str:
best_quality = next(
(
quality
for quality in self.QUALITY_RANK
if metadata["attributes"]["assetTokens"].get(quality)
),
None,
)
return metadata["attributes"]["assetTokens"][best_quality]
def get_stream_url_from_user(self, metadata: dict) -> str:
qualities = list(metadata["attributes"]["assetTokens"].keys())
choices = [
Choice(
name=quality,
value=quality,
)
for quality in qualities
]
selected = inquirer.select(
message="Select which quality to download:",
choices=choices,
).execute()
return metadata["attributes"]["assetTokens"][selected]
def get_stream_url(self, metadata: dict) -> str:
if self.quality == PostQuality.BEST:
stream_url = self.get_stream_url_best(metadata)
elif self.quality == PostQuality.ASK:
stream_url = self.get_stream_url_from_user(metadata)
return stream_url
def get_tags(self, metadata: dict) -> MediaTags:
attributes = metadata["attributes"]
upload_date = attributes.get("uploadDate")
return MediaTags(
artist=attributes.get("artistName"),
date=self.downloader.parse_date(upload_date) if upload_date else None,
title=attributes.get("name"),
title_id=int(metadata["id"]),
storefront=int(self.downloader.itunes_api.storefront_id.split("-")[0]),
)
def get_cover_path(self, final_path: Path, cover_format: str) -> Path:
return final_path.with_suffix(
self.downloader.get_cover_file_extension(cover_format)
)
def download(
self,
media_id: str = None,
media_metadata: dict = None,
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
)
def _download(
self,
media_id: str = None,
media_metadata: dict = None,
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if not media_id and not media_metadata:
raise ValueError("Either media_id or media_metadata must be provided")
if media_metadata:
media_id = media_metadata["id"]
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Post Video metadata")
media_metadata = self.downloader.apple_music_api.get_post(media_id)
download_info.media_metadata = media_metadata
if not self.downloader.is_media_streamable(media_metadata):
yield download_info
raise MediaNotStreamableException()
tags = self.get_tags(media_metadata)
final_path = self.downloader.get_final_path(
tags,
".m4v",
None,
)
download_info.tags = tags
download_info.final_path = final_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
cover_url = self.downloader.get_cover_url(media_metadata)
cover_format = self.downloader.get_cover_format(cover_url)
if cover_format and self.downloader.save_cover:
cover_path = self.get_cover_path(final_path, cover_format)
else:
cover_path = None
download_info.cover_url = cover_url
download_info.cover_format = cover_format
download_info.cover_path = cover_path
stream_url = self.get_stream_url(media_metadata)
staged_path = self.downloader.get_temp_path(
media_id,
"stage",
".m4v",
)
logger.info(f"[{colored_media_id}] Downloading Post Video")
logger.debug(f"[{colored_media_id}] Downloading to {staged_path}")
self.downloader.download_ytdlp(
staged_path,
stream_url,
)
download_info.staged_path = staged_path
yield download_info
-755
View File
@@ -1,755 +0,0 @@
from __future__ import annotations
import base64
import datetime
import json
import logging
import re
import subprocess
import typing
from pathlib import Path
from xml.dom import minidom
from xml.etree import ElementTree
import colorama
import m3u8
from InquirerPy import inquirer
from InquirerPy.base.control import Choice
from pywidevine import PSSH
from pywidevine.license_protocol_pb2 import WidevinePsshData
from .downloader import Downloader
from .enums import MediaFileFormat, RemuxMode, SongCodec, SyncedLyricsFormat
from .exceptions import *
from .models import (
DecryptionKey,
DecryptionKeyAv,
DownloadInfo,
Lyrics,
MediaRating,
MediaTags,
MediaType,
StreamInfo,
StreamInfoAv,
)
from .utils import color_text
logger = logging.getLogger("gamdl")
class DownloaderSong:
DEFAULT_DECRYPTION_KEY = "32b8ade1769e26b1ffb8986352793fc6"
MP4_FORMAT_CODECS = ["ec-3"]
SONG_CODEC_REGEX_MAP = {
SongCodec.AAC: r"audio-stereo-\d+",
SongCodec.AAC_HE: r"audio-HE-stereo-\d+",
SongCodec.AAC_BINAURAL: r"audio-stereo-\d+-binaural",
SongCodec.AAC_DOWNMIX: r"audio-stereo-\d+-downmix",
SongCodec.AAC_HE_BINAURAL: r"audio-HE-stereo-\d+-binaural",
SongCodec.AAC_HE_DOWNMIX: r"audio-HE-stereo-\d+-downmix",
SongCodec.ATMOS: r"audio-atmos-.*",
SongCodec.AC3: r"audio-ac3-.*",
SongCodec.ALAC: r"audio-alac-.*",
}
DRM_DEFAULT_KEY_MAPPING = {
"urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed": (
"data:text/plain;base64,AAAAOHBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAABgSEAAAAAA"
"AAAAAczEvZTEgICBI88aJmwY="
),
"com.microsoft.playready": (
"data:text/plain;charset=UTF-16;base64,vgEAAAEAAQC0ATwAVwBSAE0ASABFAEEARABF"
"AFIAIAB4AG0AbABuAHMAPQAiAGgAdAB0AHAAOgAvAC8AcwBjAGgAZQBtAGEAcwAuAG0AaQBjAH"
"IAbwBzAG8AZgB0AC4AYwBvAG0ALwBEAFIATQAvADIAMAAwADcALwAwADMALwBQAGwAYQB5AFIA"
"ZQBhAGQAeQBIAGUAYQBkAGUAcgAiACAAdgBlAHIAcwBpAG8AbgA9ACIANAAuADMALgAwAC4AMA"
"AiAD4APABEAEEAVABBAD4APABQAFIATwBUAEUAQwBUAEkATgBGAE8APgA8AEsASQBEAFMAPgA8"
"AEsASQBEACAAQQBMAEcASQBEAD0AIgBBAEUAUwBDAEIAQwAiACAAVgBBAEwAVQBFAD0AIgBBAE"
"EAQQBBAEEAQQBBAEEAQQBBAEIAegBNAFMAOQBsAE0AUwBBAGcASQBBAD0APQAiAD4APAAvAEsA"
"SQBEAD4APAAvAEsASQBEAFMAPgA8AC8AUABSAE8AVABFAEMAVABJAE4ARgBPAD4APAAvAEQAQQ"
"BUAEEAPgA8AC8AVwBSAE0ASABFAEEARABFAFIAPgA="
),
"com.apple.streamingkeydelivery": "skd://itunes.apple.com/P000000000/s1/e1",
}
def __init__(
self,
downloader: Downloader,
codec: SongCodec = SongCodec.AAC_LEGACY,
synced_lyrics_format: SyncedLyricsFormat = SyncedLyricsFormat.LRC,
):
self.downloader = downloader
self.codec = codec
self.synced_lyrics_format = synced_lyrics_format
def _search_m3u8_metadata(self, m3u8_data: dict, data_id: str) -> dict:
searched = next(
(
session_data
for session_data in m3u8_data["session_data"]
if session_data["data_id"] == data_id
),
None,
)
if not searched:
return None
return json.loads(base64.b64decode(searched["value"]).decode("utf-8"))
def get_audio_session_key_metadata(self, m3u8_data: dict) -> dict:
return self._search_m3u8_metadata(
m3u8_data,
"com.apple.hls.AudioSessionKeyInfo",
)
def get_asset_metadata(self, m3u8_data: dict) -> dict:
return self._search_m3u8_metadata(
m3u8_data,
"com.apple.hls.audioAssetMetadata",
)
def get_playlist_from_codec(self, m3u8_data: dict) -> dict | None:
m3u8_master_playlists = [
playlist
for playlist in m3u8_data["playlists"]
if re.fullmatch(
self.SONG_CODEC_REGEX_MAP[self.codec], playlist["stream_info"]["audio"]
)
]
if not m3u8_master_playlists:
return None
m3u8_master_playlists.sort(key=lambda x: x["stream_info"]["average_bandwidth"])
return m3u8_master_playlists[-1]
def get_playlist_from_user(self, m3u8_data: dict) -> dict | None:
m3u8_master_playlists = [playlist for playlist in m3u8_data["playlists"]]
choices = [
Choice(
name=playlist["stream_info"]["audio"],
value=playlist,
)
for playlist in m3u8_master_playlists
]
selected = inquirer.select(
message="Select which codec to download:",
choices=choices,
).execute()
return selected
def _get_drm_uri_from_session_key(
self,
drm_infos: dict,
drm_ids: list,
drm_key: str,
) -> str | None:
drm_info = next(
(
drm_infos[drm_id]
for drm_id in drm_ids
if drm_infos[drm_id].get(drm_key) and drm_id != "1"
),
None,
)
if not drm_info:
return None
return drm_info[drm_key]["URI"]
def _get_drm_uri_from_m3u8_keys(
self,
m3u8_obj: m3u8.M3U8,
drm_key: str,
) -> str | None:
drm_uri = next(
(
key
for key in m3u8_obj.keys
if key.keyformat == drm_key
and key.uri != self.DRM_DEFAULT_KEY_MAPPING[drm_key]
),
None,
)
if not drm_uri:
return None
return drm_uri.uri
def _get_stream_info(self, m3u8_url: str) -> StreamInfoAv | None:
stream_info = StreamInfo()
m3u8_master_obj = m3u8.load(m3u8_url)
m3u8_master_data = m3u8_master_obj.data
if self.codec == SongCodec.ASK:
playlist = self.get_playlist_from_user(m3u8_master_data)
else:
playlist = self.get_playlist_from_codec(m3u8_master_data)
if playlist is None:
return None
stream_info.stream_url = m3u8_master_obj.base_uri + playlist["uri"]
stream_info.codec = playlist["stream_info"]["codecs"]
is_mp4 = any(
stream_info.codec.startswith(possible_codec)
for possible_codec in self.MP4_FORMAT_CODECS
)
session_key_metadata = self.get_audio_session_key_metadata(m3u8_master_data)
if session_key_metadata:
asset_metadata = self.get_asset_metadata(m3u8_master_data)
variant_id = playlist["stream_info"]["stable_variant_id"]
drm_ids = asset_metadata[variant_id]["AUDIO-SESSION-KEY-IDS"]
(
stream_info.widevine_pssh,
stream_info.playready_pssh,
stream_info.fairplay_key,
) = (
self._get_drm_uri_from_session_key(
session_key_metadata,
drm_ids,
drm_key,
)
for drm_key in self.DRM_DEFAULT_KEY_MAPPING.keys()
)
else:
m3u8_obj = m3u8.load(stream_info.stream_url)
(
stream_info.widevine_pssh,
stream_info.playready_pssh,
stream_info.fairplay_key,
) = (
self._get_drm_uri_from_m3u8_keys(
m3u8_obj,
drm_key,
)
for drm_key in self.DRM_DEFAULT_KEY_MAPPING.keys()
)
return StreamInfoAv(
audio_track=stream_info,
file_format=MediaFileFormat.MP4 if is_mp4 else MediaFileFormat.M4A,
)
def get_stream_info(self, track_metadata: dict) -> StreamInfoAv | None:
m3u8_url = track_metadata["attributes"]["extendedAssetUrls"].get("enhancedHls")
if not m3u8_url:
return None
return self._get_stream_info(m3u8_url)
def get_stream_info_legacy(self, webplayback: dict) -> StreamInfoAv:
flavor = "32:ctrp64" if self.codec == SongCodec.AAC_HE_LEGACY else "28:ctrp256"
stream_info = StreamInfo()
stream_info.stream_url = next(
i for i in webplayback["assets"] if i["flavor"] == flavor
)["URL"]
m3u8_obj = m3u8.load(stream_info.stream_url)
stream_info.widevine_pssh = m3u8_obj.keys[0].uri
return StreamInfoAv(
audio_track=stream_info,
file_format=MediaFileFormat.M4A,
)
def get_decryption_key(
self,
stream_info: StreamInfoAv,
media_id: str,
) -> DecryptionKeyAv:
decryption_key = self.downloader.get_decryption_key(
stream_info.audio_track.widevine_pssh,
media_id,
)
return DecryptionKeyAv(
audio_track=decryption_key,
)
def get_decryption_key_legacy(
self,
stream_info: StreamInfoAv,
media_id: str,
) -> DecryptionKeyAv:
stream_info_audio = stream_info.audio_track
try:
cdm_session = self.downloader.cdm.open()
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = 1
widevine_pssh_data.key_ids.append(
base64.b64decode(stream_info_audio.widevine_pssh.split(",")[1])
)
pssh_obj = PSSH(widevine_pssh_data.SerializeToString())
challenge = base64.b64encode(
self.downloader.cdm.get_license_challenge(cdm_session, pssh_obj)
).decode()
license = self.downloader.apple_music_api.get_widevine_license(
media_id,
stream_info.audio_track.widevine_pssh,
challenge,
)
self.downloader.cdm.parse_license(cdm_session, license)
decryption_key = next(
i
for i in self.downloader.cdm.get_keys(cdm_session)
if i.type == "CONTENT"
)
finally:
self.downloader.cdm.close(cdm_session)
return DecryptionKeyAv(
audio_track=DecryptionKey(
kid=decryption_key.kid.hex,
key=decryption_key.key.hex(),
)
)
@staticmethod
def parse_datetime_obj_from_timestamp_ttml(
timestamp_ttml: str,
) -> datetime.datetime:
mins_secs_ms = re.findall(r"\d+", timestamp_ttml)
ms, secs, mins = 0, 0, 0
if len(mins_secs_ms) == 2 and ":" in timestamp_ttml:
secs, mins = int(mins_secs_ms[-1]), int(mins_secs_ms[-2])
elif len(mins_secs_ms) == 1:
ms = int(mins_secs_ms[-1])
else:
secs = float(f"{mins_secs_ms[-2]}.{mins_secs_ms[-1]}")
if len(mins_secs_ms) > 2:
mins = int(mins_secs_ms[-3])
return datetime.datetime.fromtimestamp(
(mins * 60) + secs + (ms / 1000),
tz=datetime.timezone.utc,
)
def get_lyrics_synced_timestamp_lrc(self, timestamp_ttml: str) -> str:
datetime_obj = self.parse_datetime_obj_from_timestamp_ttml(timestamp_ttml)
ms_new = datetime_obj.strftime("%f")[:-3]
if int(ms_new[-1]) >= 5:
ms = int(f"{int(ms_new[:2]) + 1}") * 10
datetime_obj += datetime.timedelta(milliseconds=ms) - datetime.timedelta(
microseconds=datetime_obj.microsecond
)
return datetime_obj.strftime("%M:%S.%f")[:-4]
def get_lyrics_synced_timestamp_srt(self, timestamp_ttml: str) -> str:
datetime_obj = self.parse_datetime_obj_from_timestamp_ttml(timestamp_ttml)
return datetime_obj.strftime("00:%M:%S,%f")[:-3]
def get_lyrics_synced_line_lrc(self, timestamp_ttml: str, text: str) -> str:
return f"[{self.get_lyrics_synced_timestamp_lrc(timestamp_ttml)}]{text}"
def get_lyrics_synced_line_srt(
self,
index: int,
timestamp_ttml_start: str,
timestamp_ttml_end: str,
text: str,
) -> str:
timestamp_srt_start = self.get_lyrics_synced_timestamp_srt(timestamp_ttml_start)
timestamp_srt_end = self.get_lyrics_synced_timestamp_srt(timestamp_ttml_end)
return f"{index}\n{timestamp_srt_start} --> {timestamp_srt_end}\n{text}\n"
def get_lyrics(self, track_metadata: dict) -> Lyrics | None:
lyrics = Lyrics()
if not track_metadata["attributes"]["hasLyrics"]:
return None
elif track_metadata.get("relationships") is None:
track_metadata = self.downloader.apple_music_api.get_song(
self.downloader.get_media_id_of_library_media(track_metadata)
)
if (
track_metadata["relationships"].get("lyrics")
and track_metadata["relationships"]["lyrics"].get("data")
and track_metadata["relationships"]["lyrics"]["data"][0].get("attributes")
):
lyrics = self._get_lyrics(
track_metadata["relationships"]["lyrics"]["data"][0]["attributes"][
"ttml"
]
)
return lyrics
def _get_lyrics(self, lyrics_ttml: str) -> Lyrics:
lyrics_ttml_et = ElementTree.fromstring(lyrics_ttml)
unsynced_lyrics = []
synced_lyrics = []
index = 1
for div in lyrics_ttml_et.iter("{http://www.w3.org/ns/ttml}div"):
stanza = []
unsynced_lyrics.append(stanza)
for p in div.iter("{http://www.w3.org/ns/ttml}p"):
if p.text is not None:
stanza.append(p.text)
if p.attrib.get("begin"):
if self.synced_lyrics_format == SyncedLyricsFormat.LRC:
synced_lyrics.append(
f"{self.get_lyrics_synced_line_lrc(p.attrib.get('begin'), p.text)}"
)
if self.synced_lyrics_format == SyncedLyricsFormat.SRT:
synced_lyrics.append(
f"{self.get_lyrics_synced_line_srt(index, p.attrib.get('begin'), p.attrib.get('end'), p.text)}"
)
if self.synced_lyrics_format == SyncedLyricsFormat.TTML:
if not synced_lyrics:
synced_lyrics.append(
minidom.parseString(lyrics_ttml).toprettyxml()
)
continue
index += 1
return Lyrics(
synced="\n".join(synced_lyrics) + "\n",
unsynced="\n\n".join(
["\n".join(lyric_group) for lyric_group in unsynced_lyrics]
),
)
def get_tags(self, webplayback: dict, lyrics_unsynced: str) -> MediaTags:
webplayback_metadata = webplayback["assets"][0]["metadata"]
tags = MediaTags(
album=webplayback_metadata["playlistName"],
album_artist=webplayback_metadata["playlistArtistName"],
album_id=int(webplayback_metadata["playlistId"]),
album_sort=webplayback_metadata["sort-album"],
artist=webplayback_metadata["artistName"],
artist_id=int(webplayback_metadata["artistId"]),
artist_sort=webplayback_metadata["sort-artist"],
comment=webplayback_metadata.get("comments"),
compilation=webplayback_metadata["compilation"],
composer=webplayback_metadata.get("composerName"),
composer_id=(
int(webplayback_metadata.get("composerId"))
if webplayback_metadata.get("composerId")
else None
),
composer_sort=webplayback_metadata.get("sort-composer"),
copyright=webplayback_metadata.get("copyright"),
date=(
self.downloader.parse_date(webplayback_metadata["releaseDate"])
if webplayback_metadata.get("releaseDate")
else None
),
disc=webplayback_metadata["discNumber"],
disc_total=webplayback_metadata["discCount"],
gapless=webplayback_metadata["gapless"],
genre=webplayback_metadata.get("genre"),
genre_id=int(webplayback_metadata["genreId"]),
lyrics=lyrics_unsynced if lyrics_unsynced else None,
media_type=MediaType.SONG,
rating=MediaRating(webplayback_metadata["explicit"]),
storefront=webplayback_metadata["s"],
title=webplayback_metadata["itemName"],
title_id=int(webplayback_metadata["itemId"]),
title_sort=webplayback_metadata["sort-name"],
track=webplayback_metadata["trackNumber"],
track_total=webplayback_metadata["trackCount"],
xid=webplayback_metadata.get("xid"),
)
return tags
def fix_key_id(self, encrypted_path: Path):
count = 0
with open(encrypted_path, "rb+") as file:
while data := file.read(4096):
pos = file.tell()
i = 0
while tenc := max(0, data.find(b"tenc", i)):
kid = tenc + 12
file.seek(max(0, pos - 4096) + kid, 0)
file.write(bytes.fromhex(f"{count:032}"))
count += 1
i = kid + 1
file.seek(pos, 0)
def decrypt(
self,
encrypted_path: Path,
decrypted_path: Path,
decryption_key: str,
codec: SongCodec,
):
if codec.is_legacy():
keys = [
"--key",
f"1:{decryption_key}",
]
else:
self.fix_key_id(encrypted_path)
keys = [
"--key",
"0" * 31 + "1" + f":{decryption_key}",
"--key",
"0" * 32 + f":{self.DEFAULT_DECRYPTION_KEY}",
]
subprocess.run(
[
self.downloader.mp4decrypt_path_full,
*keys,
encrypted_path,
decrypted_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def stage(
self,
codec: SongCodec,
encrypted_path: Path,
decrypted_path: Path,
decryption_key: DecryptionKeyAv,
staged_path: Path,
):
if codec.is_legacy() and self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
)
else:
self.decrypt(
encrypted_path,
decrypted_path,
decryption_key.audio_track.key,
codec,
)
if self.downloader.remux_mode == RemuxMode.FFMPEG:
self.remux_ffmpeg(
decrypted_path,
staged_path,
)
else:
self.remux_mp4box(
decrypted_path,
staged_path,
)
def remux_mp4box(self, decrypted_path: Path, remuxed_path: Path):
subprocess.run(
[
self.downloader.mp4box_path_full,
"-quiet",
"-add",
decrypted_path,
"-itags",
"artist=placeholder",
"-keep-utc",
"-new",
remuxed_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def remux_ffmpeg(
self,
decrypted_path: Path,
remuxed_path: Path,
decryption_key: str = None,
):
if decryption_key:
decryption_key_arg = [
"-decryption_key",
decryption_key,
]
else:
decryption_key_arg = []
subprocess.run(
[
self.downloader.ffmpeg_path_full,
"-loglevel",
"error",
"-y",
*decryption_key_arg,
"-i",
decrypted_path,
"-c",
"copy",
"-movflags",
"+faststart",
remuxed_path,
],
check=True,
**self.downloader.subprocess_additional_args,
)
def get_lyrics_synced_path(self, final_path: Path) -> Path:
return final_path.with_suffix("." + self.synced_lyrics_format.value)
def get_cover_path(self, final_path: Path, cover_format: str) -> Path:
return final_path.parent / (
"Cover" + self.downloader.get_cover_file_extension(cover_format)
)
def save_lyrics_synced(self, lyrics_synced_path: Path, lyrics_synced: str):
lyrics_synced_path.parent.mkdir(parents=True, exist_ok=True)
lyrics_synced_path.write_text(lyrics_synced, encoding="utf8")
def download(
self,
media_id: str = None,
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> typing.Generator[DownloadInfo, None, None]:
yield from self.downloader._final_processing_wrapper(
self._download,
media_id,
media_metadata,
playlist_attributes,
playlist_track,
)
def _download(
self,
media_id: str = None,
media_metadata: dict = None,
playlist_attributes: dict = None,
playlist_track: int = None,
) -> typing.Generator[DownloadInfo, None, None]:
download_info = DownloadInfo()
yield download_info
if playlist_track is None and playlist_attributes:
raise ValueError(
"playlist_track must be provided if playlist_attributes is provided"
)
if playlist_attributes:
playlist_tags = self.downloader.get_playlist_tags(
playlist_attributes,
playlist_track,
)
else:
playlist_tags = None
download_info.playlist_tags = playlist_tags
if not media_id and not media_metadata:
raise ValueError("Either media_id or media_metadata must be provided")
if media_metadata:
media_id = self.downloader.get_media_id_of_library_media(media_metadata)
download_info.media_id = media_id
colored_media_id = color_text(media_id, colorama.Style.DIM)
database_final_path = self.downloader.get_database_final_path(media_id)
if database_final_path:
download_info.final_path = database_final_path
yield download_info
raise MediaFileAlreadyExistsException(database_final_path)
if not media_metadata:
logger.debug(f"[{colored_media_id}] Getting Song metadata")
media_metadata = self.downloader.apple_music_api.get_song(media_id)
download_info.media_metadata = media_metadata
if not self.downloader.is_media_streamable(media_metadata):
raise MediaNotStreamableException()
logger.debug(f"[{colored_media_id}] Getting lyrics")
lyrics = self.get_lyrics(media_metadata)
download_info.lyrics = lyrics
logger.debug(f"[{colored_media_id}] Getting webplayback info")
webplayback = self.downloader.apple_music_api.get_webplayback(
media_id,
)
tags = self.get_tags(
webplayback,
lyrics.unsynced if lyrics else None,
)
final_path = self.downloader.get_final_path(tags, ".m4a", playlist_tags)
download_info.tags = tags
download_info.final_path = final_path
if lyrics and lyrics.synced:
synced_lyrics_path = self.get_lyrics_synced_path(final_path)
else:
synced_lyrics_path = None
download_info.synced_lyrics_path = synced_lyrics_path
if self.downloader.synced_lyrics_only:
logger.info(
f"[{colored_media_id}] Downloading synced lyrics only, skipping song download"
)
yield download_info
return
cover_url = self.downloader.get_cover_url(media_metadata)
cover_format = self.downloader.get_cover_format(cover_url)
if cover_format:
cover_path = self.get_cover_path(final_path, cover_format)
else:
cover_path = None
download_info.cover_url = cover_url
download_info.cover_format = cover_format
download_info.cover_path = cover_path
if final_path.exists() and not self.downloader.overwrite:
yield download_info
raise MediaFileAlreadyExistsException(final_path)
logger.debug(f"[{colored_media_id}] Getting stream info")
if self.codec.is_legacy():
stream_info = self.get_stream_info_legacy(webplayback)
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key_legacy(
stream_info,
media_id,
)
download_info.stream_info = stream_info
download_info.decryption_key = decryption_key
else:
stream_info = self.get_stream_info(media_metadata)
if not stream_info or not stream_info.audio_track.widevine_pssh:
yield download_info
raise MediaFormatNotAvailableException()
logger.debug(f"[{colored_media_id}] Getting decryption key")
decryption_key = self.get_decryption_key(
stream_info,
media_id,
)
download_info.stream_info = stream_info
download_info.decryption_key = decryption_key
encrypted_path = self.downloader.get_temp_path(
media_id,
"encrypted",
".m4a",
)
decrypted_path = self.downloader.get_temp_path(
media_id,
"decrypted",
".m4a",
)
staged_path = self.downloader.get_temp_path(
media_id,
"staged",
self.downloader.get_media_file_extension(stream_info.file_format),
)
logger.info(f"[{colored_media_id}] Downloading song")
logger.debug(f'[{colored_media_id}] Downloading to "{encrypted_path}"')
self.downloader.download(
encrypted_path,
download_info.stream_info.audio_track.stream_url,
)
logger.debug(
f'[{colored_media_id}] Decryping/remuxing to "{decrypted_path}"/"{staged_path}"'
)
self.stage(
self.codec,
encrypted_path,
decrypted_path,
decryption_key,
staged_path,
)
download_info.staged_path = staged_path
yield download_info
-113
View File
@@ -1,113 +0,0 @@
from enum import Enum
class DownloadMode(Enum):
YTDLP = "ytdlp"
NM3U8DLRE = "nm3u8dlre"
class RemuxMode(Enum):
FFMPEG = "ffmpeg"
MP4BOX = "mp4box"
class SongCodec(Enum):
AAC_LEGACY = "aac-legacy"
AAC_HE_LEGACY = "aac-he-legacy"
AAC = "aac"
AAC_HE = "aac-he"
AAC_BINAURAL = "aac-binaural"
AAC_DOWNMIX = "aac-downmix"
AAC_HE_BINAURAL = "aac-he-binaural"
AAC_HE_DOWNMIX = "aac-he-downmix"
ATMOS = "atmos"
AC3 = "ac3"
ALAC = "alac"
ASK = "ask"
def is_legacy(self) -> bool:
return self in {SongCodec.AAC_LEGACY, SongCodec.AAC_HE_LEGACY}
class SyncedLyricsFormat(Enum):
LRC = "lrc"
SRT = "srt"
TTML = "ttml"
class MusicVideoCodec(Enum):
H264 = "h264"
H265 = "h265"
ASK = "ask"
def fourcc(self) -> str:
return {
MusicVideoCodec.H264: "avc1",
MusicVideoCodec.H265: "hvc1",
}.get(self)
class RemuxFormatMusicVideo(Enum):
M4V = "m4v"
MP4 = "mp4"
class MusicVideoResolution(Enum):
R240P = "240p"
R360P = "360p"
R480P = "480p"
R540P = "540p"
R720P = "720p"
R1080P = "1080p"
R1440P = "1440p"
R2160P = "2160p"
def __int__(self) -> int:
return int(self.value[:-1])
class MediaFileFormat(Enum):
M4A = "m4a"
MP4 = "mp4"
M4V = "m4v"
class PostQuality(Enum):
BEST = "best"
ASK = "ask"
class CoverFormat(Enum):
JPG = "jpg"
PNG = "png"
RAW = "raw"
class MediaType(Enum):
SONG = 1
MUSIC_VIDEO = 6
def __str__(self) -> str:
return {
MediaType.SONG: "Song",
MediaType.MUSIC_VIDEO: "Music Video",
}[self]
def __int__(self) -> int:
return self.value
class MediaRating(Enum):
NONE = 0
EXPLICIT = 1
CLEAN = 2
def __str__(self) -> str:
return {
MediaRating.NONE: "None",
MediaRating.EXPLICIT: "Explicit",
MediaRating.CLEAN: "Clean",
}[self]
def __int__(self) -> int:
return self.value
-24
View File
@@ -1,24 +0,0 @@
from __future__ import annotations
from pathlib import Path
class MediaNotStreamableException(Exception):
DEFAULT_MESSAGE = "Media is not streamable"
def __init__(self):
super().__init__(self.DEFAULT_MESSAGE)
class MediaFileAlreadyExistsException(Exception):
DEFAULT_MESSAGE = "Media file already exists at '{media_path}'"
def __init__(self, media_path: Path):
super().__init__(self.DEFAULT_MESSAGE.format(media_path=media_path))
class MediaFormatNotAvailableException(Exception):
DEFAULT_MESSAGE = "Requested media format or codec not available"
def __init__(self):
super().__init__(self.DEFAULT_MESSAGE)
-3
View File
@@ -1,3 +0,0 @@
# Dumped from Android Studio Virtual Device running Android 9
HARDCODED_WVD = """V1ZEAgIDAASoMIIEpAIBAAKCAQEAwnCFAPXy4U1J7p1NohAS+xl040f5FBaE/59bPp301bGz0UGFT9VoEtY3vaeakKh/d319xTNvCSWsEDRaMmp/wSnMiEZUkkl04872jx2uHuR4k6KYuuJoqhsIo1TwUBueFZynHBUJzXQeW8Eb1tYAROGwp8W7r+b0RIjHC89RFnfVXpYlF5I6McktyzJNSOwlQbMqlVihfSUkv3WRd3HFmA0Oxay51CEIkoTlNTHVlzVyhov5eHCDSp7QENRgaaQ03jC/CcgFOoQymhsBtRCM0CQmfuAHjA9e77R6m/GJPy75G9fqoZM1RMzVDHKbKZPd3sFd0c0+77gLzW8cWEaaHwIDAQABAoIBAQCB2pN46MikHvHZIcTPDt0eRQoDH/YArGl2Lf7J+sOgU2U7wv49KtCug9IGHwDiyyUVsAFmycrF2RroV45FTUq0vi2SdSXV7Kjb20Ren/vBNeQw9M37QWmU8Sj7q6YyWb9hv5T69DHvvDTqIjVtbM4RMojAAxYti5hmjNIh2PrWfVYWhXxCQ/WqAjWLtZBM6Oww1byfr5I/wFogAKkgHi8wYXZ4LnIC8V7jLAhujlToOvMMC9qwcBiPKDP2FO+CPSXaqVhH+LPSEgLggnU3EirihgxovbLNAuDEeEbRTyR70B0lW19tLHixso4ZQa7KxlVUwOmrHSZf7nVuWqPpxd+BAoGBAPQLyJ1IeRavmaU8XXxfMdYDoc8+xB7v2WaxkGXb6ToX1IWPkbMz4yyVGdB5PciIP3rLZ6s1+ruuRRV0IZ98i1OuN5TSR56ShCGg3zkd5C4L/xSMAz+NDfYSDBdO8BVvBsw21KqSRUi1ctL7QiIvfedrtGb5XrE4zhH0gjXlU5qZAoGBAMv2segn0Jx6az4rqRa2Y7zRx4iZ77JUqYDBI8WMnFeR54uiioTQ+rOs3zK2fGIWlrn4ohco/STHQSUTB8oCOFLMx1BkOqiR+UyebO28DJY7+V9ZmxB2Guyi7W8VScJcIdpSOPyJFOWZQKXdQFW3YICD2/toUx/pDAJh1sEVQsV3AoGBANyyp1rthmvoo5cVbymhYQ08vaERDwU3PLCtFXu4E0Ow90VNn6Ki4ueXcv/gFOp7pISk2/yuVTBTGjCblCiJ1en4HFWekJwrvgg3Vodtq8Okn6pyMCHRqvWEPqD5hw6rGEensk0K+FMXnF6GULlfn4mgEkYpb+PvDhSYvQSGfkPJAoGAF/bAKFqlM/1eJEvU7go35bNwEiij9Pvlfm8y2L8Qj2lhHxLV240CJ6IkBz1Rl+S3iNohkT8LnwqaKNT3kVB5daEBufxMuAmOlOX4PmZdxDj/r6hDg8ecmjj6VJbXt7JDd/c5ItKoVeGPqu035dpJyE+1xPAY9CLZel4scTsiQTkCgYBt3buRcZMwnc4qqpOOQcXK+DWD6QvpkcJ55ygHYw97iP/lF4euwdHd+I5b+11pJBAao7G0fHX3eSjqOmzReSKboSe5L8ZLB2cAI8AsKTBfKHWmCa8kDtgQuI86fUfirCGdhdA9AVP2QXN2eNCuPnFWi0WHm4fYuUB5be2c18ucxAb9CAESmgsK3QMIAhIQ071yBlsbLoO2CSB9Ds0cmRif6uevBiKOAjCCAQoCggEBAMJwhQD18uFNSe6dTaIQEvsZdONH+RQWhP+fWz6d9NWxs9FBhU/VaBLWN72nmpCof3d9fcUzbwklrBA0WjJqf8EpzIhGVJJJdOPO9o8drh7keJOimLriaKobCKNU8FAbnhWcpxwVCc10HlvBG9bWAEThsKfFu6/m9ESIxwvPURZ31V6WJReSOjHJLcsyTUjsJUGzKpVYoX0lJL91kXdxxZgNDsWsudQhCJKE5TUx1Zc1coaL+Xhwg0qe0BDUYGmkNN4wvwnIBTqEMpobAbUQjNAkJn7gB4wPXu+0epvxiT8u+RvX6qGTNUTM1QxymymT3d7BXdHNPu+4C81vHFhGmh8CAwEAASjwIkgBUqoBCAEQABqBAQQlRbfiBNDb6eU6aKrsH5WJaYszTioXjPLrWN9dqyW0vwfT11kgF0BbCGkAXew2tLJJqIuD95cjJvyGUSN6VyhL6dp44fWEGDSBIPR0mvRq7bMP+m7Y/RLKf83+OyVJu/BpxivQGC5YDL9f1/A8eLhTDNKXs4Ia5DrmTWdPTPBL8SIgyfUtg3ofI+/I9Tf7it7xXpT0AbQBJfNkcNXGpO3JcBMSgAIL5xsXK5of1mMwAl6ygN1Gsj4aZ052otnwN7kXk12SMsXheWTZ/PYh2KRzmt9RPS1T8hyFx/Kp5VkBV2vTAqqWrGw/dh4URqiHATZJUlhO7PN5m2Kq1LVFdXjWSzP5XBF2S83UMe+YruNHpE5GQrSyZcBqHO0QrdPcU35GBT7S7+IJr2AAXvnjqnb8yrtpPWN2ZW/IWUJN2z4vZ7/HV4aj3OZhkxC1DIMNyvsusUKoQQuf8gwKiEe8cFwbwFSicywlFk9la2IPe8oFShcxAzHLCCn/TIYUAvEL3/4LgaZvqWm80qCPYbgIP5HT8hPYkKWJ4WYknEWK+3InbnkzteFfGrQFCq4CCAESEGnj6Ji7LD+4o7MoHYT4jBQYjtW+kQUijgIwggEKAoIBAQDY9um1ifBRIOmkPtDZTqH+CZUBbb0eK0Cn3NHFf8MFUDzPEz+emK/OTub/hNxCJCao//pP5L8tRNUPFDrrvCBMo7Rn+iUb+mA/2yXiJ6ivqcN9Cu9i5qOU1ygon9SWZRsujFFB8nxVreY5Lzeq0283zn1Cg1stcX4tOHT7utPzFG/ReDFQt0O/GLlzVwB0d1sn3SKMO4XLjhZdncrtF9jljpg7xjMIlnWJUqxDo7TQkTytJmUl0kcM7bndBLerAdJFGaXc6oSY4eNy/IGDluLCQR3KZEQsy/mLeV1ggQ44MFr7XOM+rd+4/314q/deQbjHqjWFuVr8iIaKbq+R63ShAgMBAAEo8CISgAMii2Mw6z+Qs1bvvxGStie9tpcgoO2uAt5Zvv0CDXvrFlwnSbo+qR71Ru2IlZWVSbN5XYSIDwcwBzHjY8rNr3fgsXtSJty425djNQtF5+J2jrAhf3Q2m7EI5aohZGpD2E0cr+dVj9o8x0uJR2NWR8FVoVQSXZpad3M/4QzBLNto/tz+UKyZwa7Sc/eTQc2+ZcDS3ZEO3lGRsH864Kf/cEGvJRBBqcpJXKfG+ItqEW1AAPptjuggzmZEzRq5xTGf6or+bXrKjCpBS9G1SOyvCNF1k5z6lG8KsXhgQxL6ADHMoulxvUIihyPY5MpimdXfUdEQ5HA2EqNiNVNIO4qP007jW51yAeThOry4J22xs8RdkIClOGAauLIl0lLA4flMzW+VfQl5xYxP0E5tuhn0h+844DslU8ZF7U1dU2QprIApffXD9wgAACk26Rggy8e96z8i86/+YYyZQkc9hIdCAERrgEYCEbByzONrdRDs1MrS/ch1moV5pJv63BIKvQHGvLkaFwoMY29tcGFueV9uYW1lEgd1bmtub3duGioKCm1vZGVsX25hbWUSHEFuZHJvaWQgU0RLIGJ1aWx0IGZvciB4ODZfNjQaGwoRYXJjaGl0ZWN0dXJlX25hbWUSBng4Nl82NBodCgtkZXZpY2VfbmFtZRIOZ2VuZXJpY194ODZfNjQaIAoMcHJvZHVjdF9uYW1lEhBzZGtfcGhvbmVfeDg2XzY0GmMKCmJ1aWxkX2luZm8SVUFuZHJvaWQvc2RrX3Bob25lX3g4Nl82NC9nZW5lcmljX3g4Nl82NDo5L1BTUjEuMTgwNzIwLjAxMi80OTIzMjE0OnVzZXJkZWJ1Zy90ZXN0LWtleXMaHgoUd2lkZXZpbmVfY2RtX3ZlcnNpb24SBjE0LjAuMBokCh9vZW1fY3J5cHRvX3NlY3VyaXR5X3BhdGNoX2xldmVsEgEwMg4QASAAKA0wAEAASABQAA=="""
-179
View File
@@ -1,179 +0,0 @@
from __future__ import annotations
import datetime
import typing
from dataclasses import dataclass
from pathlib import Path
from .enums import MediaFileFormat, MediaRating, MediaType
@dataclass
class UrlInfo:
storefront: str = None
type: str = None
slug: str = None
id: str = None
sub_id: str = None
library_storefront: str = None
library_type: str = None
library_id: str = None
@dataclass
class DownloadQueue:
playlist_attributes: dict = None
medias_metadata: list[dict] = None
@dataclass
class Lyrics:
synced: str = None
unsynced: str = None
@dataclass
class StreamInfo:
stream_url: str = None
widevine_pssh: str = None
playready_pssh: str = None
fairplay_key: str = None
codec: str = None
@dataclass
class StreamInfoAv:
video_track: StreamInfo = None
audio_track: StreamInfo = None
file_format: MediaFileFormat = None
@dataclass
class DecryptionKey:
kid: str = None
key: str = None
@dataclass
class DecryptionKeyAv:
video_track: DecryptionKey = None
audio_track: DecryptionKey = None
@dataclass
class MediaTags:
album: str = None
album_artist: str = None
album_id: int = None
album_sort: str = None
artist: str = None
artist_id: int = None
artist_sort: str = None
comment: str = None
compilation: bool = None
composer: str = None
composer_id: int = None
composer_sort: str = None
copyright: str = None
date: datetime.date | str = None
disc: int = None
disc_total: int = None
gapless: bool = None
genre: str = None
genre_id: int = None
lyrics: str = None
media_type: MediaType = None
rating: MediaRating = None
storefront: str = None
title: str = None
title_id: int = None
title_sort: str = None
track: int = None
track_total: int = None
xid: str = None
def to_mp4_tags(self, date_format: str = None) -> dict[str, typing.Any]:
disc_mp4 = [
[
self.disc if self.disc is not None else 0,
self.disc_total if self.disc_total is not None else 0,
]
]
if disc_mp4[0][0] == 0 and disc_mp4[0][1] == 0:
disc_mp4 = [None]
track_mp4 = [
[
self.track if self.track is not None else 0,
self.track_total if self.track_total is not None else 0,
]
]
if track_mp4[0][0] == 0 and track_mp4[0][1] == 0:
track_mp4 = [None]
if isinstance(self.date, datetime.date):
if date_format is None:
date_mp4 = self.date.isoformat()
else:
date_mp4 = self.date.strftime(date_format)
elif isinstance(self.date, str):
date_mp4 = self.date
else:
date_mp4 = None
mp4_tags = {
"\xa9alb": [self.album],
"aART": [self.album_artist],
"plID": [self.album_id],
"soal": [self.album_sort],
"\xa9ART": [self.artist],
"atID": [self.artist_id],
"soar": [self.artist_sort],
"\xa9cmt": [self.comment],
"cpil": [bool(self.compilation) if self.compilation is not None else None],
"\xa9wrt": [self.composer],
"cmID": [self.composer_id],
"soco": [self.composer_sort],
"cprt": [self.copyright],
"\xa9day": [date_mp4],
"disk": disc_mp4,
"pgap": [bool(self.gapless) if self.gapless is not None else None],
"\xa9gen": [self.genre],
"\xa9lyr": [self.lyrics],
"geID": [self.genre_id],
"stik": [int(self.media_type) if self.media_type is not None else None],
"rtng": [int(self.rating) if self.rating is not None else None],
"sfID": [self.storefront],
"\xa9nam": [self.title],
"cnID": [self.title_id],
"sonm": [self.title_sort],
"trkn": track_mp4,
"xid ": [self.xid],
}
return {k: v for k, v in mp4_tags.items() if v[0] is not None}
@dataclass
class PlaylistTags:
playlist_artist: str = None
playlist_id: int = None
playlist_title: str = None
playlist_track: int = None
@dataclass
class DownloadInfo:
media_metadata: dict = None
media_id: str = None
alt_media_id: str = None
playlist_tags: PlaylistTags = None
lyrics: Lyrics = None
tags: MediaTags = None
final_path: Path = None
cover_url: str = None
cover_format: str = None
cover_path: Path = None
stream_info: StreamInfoAv = None
decryption_key: DecryptionKeyAv = None
staged_path: Path = None
synced_lyrics_path: Path = None