Use wrapper-v2 HTTP decrypt for FairPlay CBCS.

Point amdecrypt at POST /decrypt with batched samples, robust moof/trun/senc parsing and CBCS subsample handling, and CLI/base defaults for the daemon URL. Update download/song and Apple Music paths to use the new flow; includes formatting and related API touch-ups.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Rafael Moraes
2026-05-17 11:24:20 -03:00
parent 82e3cf20a0
commit 2205b76c07
7 changed files with 3716 additions and 3568 deletions
+608 -608
View File
File diff suppressed because it is too large Load Diff
+306 -306
View File
@@ -1,306 +1,306 @@
import asyncio
from functools import wraps
from pathlib import Path
import click
import colorama
import structlog
from dataclass_click import dataclass_click
from httpx import ConnectError
from .. import __version__
from ..api import AppleMusicApi
from ..downloader import (
AppleMusicBaseDownloader,
AppleMusicDownloader,
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
from ..interface import (
AppleMusicBaseInterface,
AppleMusicInterface,
AppleMusicMusicVideoInterface,
AppleMusicSongInterface,
AppleMusicUploadedVideoInterface,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFlatFilterExcludedError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceUrlParseError,
)
from .cli_config import CliConfig
from .config_file import ConfigFile
from .database import Database
from .interactive_prompts import InteractivePrompts
from .utils import CustomOutputWriter, custom_structlog_formatter, prompt_path
logger = structlog.get_logger(__name__)
def make_sync(func):
@wraps(func)
def wrapper(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapper
@click.command()
@click.help_option("-h", "--help")
@click.version_option(__version__, "-v", "--version")
@dataclass_click(CliConfig)
@ConfigFile.loader
@make_sync
async def main(config: CliConfig):
colorama.just_fix_windows_console()
log_output = CustomOutputWriter()
if config.log_file:
log_output.add_file(config.log_file)
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.ExceptionPrettyPrinter(),
custom_structlog_formatter,
],
logger_factory=structlog.PrintLoggerFactory(file=log_output),
wrapper_class=structlog.make_filtering_bound_logger(config.log_level),
)
logger.info(f"Starting Gamdl {__version__}")
if config.use_wrapper:
try:
apple_music_api = await AppleMusicApi.create_from_wrapper(
wrapper_account_url=config.wrapper_account_url,
language=config.language,
)
except ConnectError:
logger.critical(
"Could not connect to the wrapper account API. "
"Make sure the wrapper is running and the URL is correct."
)
return
else:
cookies_path = prompt_path(config.cookies_path)
apple_music_api = await AppleMusicApi.create_from_netscape_cookies(
cookies_path=cookies_path,
language=config.language,
)
if not apple_music_api.active_subscription:
logger.critical(
"No active Apple Music subscription found, you won't be able to download"
" anything"
)
return
if apple_music_api.account_restrictions:
logger.warning(
"Your account has content restrictions enabled, some content may not be"
" downloadable"
)
if (
any(not codec.is_legacy() for codec in config.song_codec_piority)
and not config.use_wrapper
):
logger.warning(
"You have chosen an experimental song codec "
"without enabling wrapper. "
"They're not guaranteed to work due to API limitations."
)
if config.database_path:
database = Database(config.database_path, config.overwrite)
flat_filter = database.flat_filter
else:
database = None
flat_filter = None
interactive_prompts = InteractivePrompts(
artist_auto_select=config.artist_auto_select,
)
base_interface = await AppleMusicBaseInterface.create(
apple_music_api=apple_music_api,
cover_format=config.cover_format,
cover_size=config.cover_size,
use_wrapper=config.use_wrapper,
wrapper_m3u8_ip=config.wrapper_m3u8_ip,
wvd_path=config.wvd_path,
)
song_interface = AppleMusicSongInterface(
base=base_interface,
synced_lyrics_format=config.synced_lyrics_format,
codec_priority=config.song_codec_piority,
use_album_date=config.use_album_date,
skip_stream_info=config.synced_lyrics_only,
ask_codec_function=interactive_prompts.ask_song_codec,
)
music_video_interface = AppleMusicMusicVideoInterface(
base=base_interface,
resolution=config.music_video_resolution,
codec_priority=config.music_video_codec_priority,
ask_video_codec_function=interactive_prompts.ask_music_video_video_codec_function,
ask_audio_codec_function=interactive_prompts.ask_music_video_audio_codec_function,
)
uploaded_video_interface = AppleMusicUploadedVideoInterface(
base=base_interface,
quality=config.uploaded_video_quality,
ask_quality_function=interactive_prompts.ask_uploaded_video_quality_function,
)
interface = AppleMusicInterface(
song=song_interface,
music_video=music_video_interface,
uploaded_video=uploaded_video_interface,
artist_select_media_type_function=interactive_prompts.ask_artist_media_type,
artist_select_items_function=interactive_prompts.ask_artist_select_items,
flat_filter_function=flat_filter,
)
base_downloader = AppleMusicBaseDownloader(
interface=interface,
output_path=config.output_path,
temp_path=config.temp_path,
nm3u8dlre_path=config.nm3u8dlre_path,
mp4decrypt_path=config.mp4decrypt_path,
ffmpeg_path=config.ffmpeg_path,
mp4box_path=config.mp4box_path,
wrapper_decrypt_ip=config.wrapper_decrypt_ip,
download_mode=config.download_mode,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
no_album_folder_template=config.no_album_folder_template,
playlist_folder_template=config.playlist_folder_template,
single_disc_file_template=config.single_disc_file_template,
multi_disc_file_template=config.multi_disc_file_template,
no_album_file_template=config.no_album_file_template,
playlist_file_template=config.playlist_file_template,
date_tag_template=config.date_tag_template,
exclude_tags=config.exclude_tags,
truncate=config.truncate,
)
song_downloader = AppleMusicSongDownloader(
base=base_downloader,
)
music_video_downloader = AppleMusicMusicVideoDownloader(
base=base_downloader,
remux_mode=config.music_video_remux_mode,
remux_format=config.music_video_remux_format,
)
uploaded_video_downloader = AppleMusicUploadedVideoDownloader(
base=base_downloader,
)
downloader = AppleMusicDownloader(
song=song_downloader,
music_video=music_video_downloader,
uploaded_video=uploaded_video_downloader,
overwrite=config.overwrite,
save_cover=config.save_cover,
save_playlist=config.save_playlist,
no_synced_lyrics=config.no_synced_lyrics,
synced_lyrics_only=config.synced_lyrics_only,
)
if config.read_urls_as_txt:
urls_from_file = []
for url in config.urls:
if Path(url).is_file() and Path(url).exists():
urls_from_file.extend(
[
line.strip()
for line in Path(url).read_text(encoding="utf-8").splitlines()
if line.strip()
]
)
urls = urls_from_file
else:
urls = config.urls
error_count = 0
for url_index, url in enumerate(urls, 1):
url_log = logger.bind(action=f"URL {url_index:>3}/{len(urls):<3}")
url_log.info(f'Processing "{url}"')
try:
async for download_item in downloader.get_download_item_from_url(url):
media_index = download_item.media.index + 1
media_total = download_item.media.total or "-"
track_log = logger.bind(
action=f"Track {media_index:>3}/{media_total:<3}"
)
media_title = (
download_item.media.media_metadata["attributes"]["name"]
if download_item.media.media_metadata
and download_item.media.media_metadata.get("attributes", {}).get(
"name"
)
else "Unknown Title"
)
media_type = (
download_item.media.media_metadata["type"]
if download_item.media.media_metadata
else None
)
if download_item.media.partial and media_type in {
None,
"songs",
"library-songs",
"music-videos",
"library-music-videos",
"uploaded-videos",
}:
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
except (
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceArtistMediaTypeError,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
GamdlInterfaceFlatFilterExcludedError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
except Exception as e:
error_count += 1
track_log.exception(f'Error downloading "{media_title}"')
if (
database
and download_item.media.media_metadata
and download_item.final_path
):
database.add(
download_item.media.media_metadata["id"],
download_item.final_path,
)
except GamdlInterfaceUrlParseError as e:
url_log.error(f"{e}")
continue
except Exception as e:
url_log.exception(f'Error processing "{url}": {e}')
error_count += 1
continue
logger.info(f"Finished with {error_count} error(s)")
import asyncio
from functools import wraps
from pathlib import Path
import click
import colorama
import structlog
from dataclass_click import dataclass_click
from httpx import ConnectError
from .. import __version__
from ..api import AppleMusicApi
from ..downloader import (
AppleMusicBaseDownloader,
AppleMusicDownloader,
AppleMusicMusicVideoDownloader,
AppleMusicSongDownloader,
AppleMusicUploadedVideoDownloader,
GamdlDownloaderDependencyNotFoundError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderSyncedLyricsOnlyError,
)
from ..interface import (
AppleMusicBaseInterface,
AppleMusicInterface,
AppleMusicMusicVideoInterface,
AppleMusicSongInterface,
AppleMusicUploadedVideoInterface,
GamdlInterfaceArtistMediaTypeError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceFlatFilterExcludedError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceUrlParseError,
)
from .cli_config import CliConfig
from .config_file import ConfigFile
from .database import Database
from .interactive_prompts import InteractivePrompts
from .utils import CustomOutputWriter, custom_structlog_formatter, prompt_path
logger = structlog.get_logger(__name__)
def make_sync(func):
@wraps(func)
def wrapper(*args, **kwargs):
return asyncio.run(func(*args, **kwargs))
return wrapper
@click.command()
@click.help_option("-h", "--help")
@click.version_option(__version__, "-v", "--version")
@dataclass_click(CliConfig)
@ConfigFile.loader
@make_sync
async def main(config: CliConfig):
colorama.just_fix_windows_console()
log_output = CustomOutputWriter()
if config.log_file:
log_output.add_file(config.log_file)
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.ExceptionPrettyPrinter(),
custom_structlog_formatter,
],
logger_factory=structlog.PrintLoggerFactory(file=log_output),
wrapper_class=structlog.make_filtering_bound_logger(config.log_level),
)
logger.info(f"Starting Gamdl {__version__}")
if config.use_wrapper:
try:
apple_music_api = await AppleMusicApi.create_from_wrapper(
wrapper_account_url=config.wrapper_account_url,
language=config.language,
)
except ConnectError:
logger.critical(
"Could not connect to the wrapper account API. "
"Make sure the wrapper is running and the URL is correct."
)
return
else:
cookies_path = prompt_path(config.cookies_path)
apple_music_api = await AppleMusicApi.create_from_netscape_cookies(
cookies_path=cookies_path,
language=config.language,
)
if not apple_music_api.active_subscription:
logger.critical(
"No active Apple Music subscription found, you won't be able to download"
" anything"
)
return
if apple_music_api.account_restrictions:
logger.warning(
"Your account has content restrictions enabled, some content may not be"
" downloadable"
)
if (
any(not codec.is_legacy() for codec in config.song_codec_piority)
and not config.use_wrapper
):
logger.warning(
"You have chosen an experimental song codec "
"without enabling wrapper. "
"They're not guaranteed to work due to API limitations."
)
if config.database_path:
database = Database(config.database_path, config.overwrite)
flat_filter = database.flat_filter
else:
database = None
flat_filter = None
interactive_prompts = InteractivePrompts(
artist_auto_select=config.artist_auto_select,
)
base_interface = await AppleMusicBaseInterface.create(
apple_music_api=apple_music_api,
cover_format=config.cover_format,
cover_size=config.cover_size,
use_wrapper=config.use_wrapper,
wrapper_m3u8_ip=config.wrapper_m3u8_ip,
wvd_path=config.wvd_path,
)
song_interface = AppleMusicSongInterface(
base=base_interface,
synced_lyrics_format=config.synced_lyrics_format,
codec_priority=config.song_codec_piority,
use_album_date=config.use_album_date,
skip_stream_info=config.synced_lyrics_only,
ask_codec_function=interactive_prompts.ask_song_codec,
)
music_video_interface = AppleMusicMusicVideoInterface(
base=base_interface,
resolution=config.music_video_resolution,
codec_priority=config.music_video_codec_priority,
ask_video_codec_function=interactive_prompts.ask_music_video_video_codec_function,
ask_audio_codec_function=interactive_prompts.ask_music_video_audio_codec_function,
)
uploaded_video_interface = AppleMusicUploadedVideoInterface(
base=base_interface,
quality=config.uploaded_video_quality,
ask_quality_function=interactive_prompts.ask_uploaded_video_quality_function,
)
interface = AppleMusicInterface(
song=song_interface,
music_video=music_video_interface,
uploaded_video=uploaded_video_interface,
artist_select_media_type_function=interactive_prompts.ask_artist_media_type,
artist_select_items_function=interactive_prompts.ask_artist_select_items,
flat_filter_function=flat_filter,
)
base_downloader = AppleMusicBaseDownloader(
interface=interface,
output_path=config.output_path,
temp_path=config.temp_path,
nm3u8dlre_path=config.nm3u8dlre_path,
mp4decrypt_path=config.mp4decrypt_path,
ffmpeg_path=config.ffmpeg_path,
mp4box_path=config.mp4box_path,
wrapper_decrypt_url=config.wrapper_decrypt_url,
download_mode=config.download_mode,
album_folder_template=config.album_folder_template,
compilation_folder_template=config.compilation_folder_template,
no_album_folder_template=config.no_album_folder_template,
playlist_folder_template=config.playlist_folder_template,
single_disc_file_template=config.single_disc_file_template,
multi_disc_file_template=config.multi_disc_file_template,
no_album_file_template=config.no_album_file_template,
playlist_file_template=config.playlist_file_template,
date_tag_template=config.date_tag_template,
exclude_tags=config.exclude_tags,
truncate=config.truncate,
)
song_downloader = AppleMusicSongDownloader(
base=base_downloader,
)
music_video_downloader = AppleMusicMusicVideoDownloader(
base=base_downloader,
remux_mode=config.music_video_remux_mode,
remux_format=config.music_video_remux_format,
)
uploaded_video_downloader = AppleMusicUploadedVideoDownloader(
base=base_downloader,
)
downloader = AppleMusicDownloader(
song=song_downloader,
music_video=music_video_downloader,
uploaded_video=uploaded_video_downloader,
overwrite=config.overwrite,
save_cover=config.save_cover,
save_playlist=config.save_playlist,
no_synced_lyrics=config.no_synced_lyrics,
synced_lyrics_only=config.synced_lyrics_only,
)
if config.read_urls_as_txt:
urls_from_file = []
for url in config.urls:
if Path(url).is_file() and Path(url).exists():
urls_from_file.extend(
[
line.strip()
for line in Path(url).read_text(encoding="utf-8").splitlines()
if line.strip()
]
)
urls = urls_from_file
else:
urls = config.urls
error_count = 0
for url_index, url in enumerate(urls, 1):
url_log = logger.bind(action=f"URL {url_index:>3}/{len(urls):<3}")
url_log.info(f'Processing "{url}"')
try:
async for download_item in downloader.get_download_item_from_url(url):
media_index = download_item.media.index + 1
media_total = download_item.media.total or "-"
track_log = logger.bind(
action=f"Track {media_index:>3}/{media_total:<3}"
)
media_title = (
download_item.media.media_metadata["attributes"]["name"]
if download_item.media.media_metadata
and download_item.media.media_metadata.get("attributes", {}).get(
"name"
)
else "Unknown Title"
)
media_type = (
download_item.media.media_metadata["type"]
if download_item.media.media_metadata
else None
)
if download_item.media.partial and media_type in {
None,
"songs",
"library-songs",
"music-videos",
"library-music-videos",
"uploaded-videos",
}:
track_log.info(f'Downloading "{media_title}"')
try:
await downloader.download(download_item)
except (
GamdlInterfaceMediaNotStreamableError,
GamdlInterfaceFormatNotAvailableError,
GamdlInterfaceDecryptionNotAvailableError,
GamdlInterfaceArtistMediaTypeError,
GamdlDownloaderSyncedLyricsOnlyError,
GamdlDownloaderMediaFileExistsError,
GamdlDownloaderDependencyNotFoundError,
GamdlInterfaceFlatFilterExcludedError,
) as e:
track_log.warning(f'Skipping "{media_title}": {e}')
continue
except Exception as e:
error_count += 1
track_log.exception(f'Error downloading "{media_title}"')
if (
database
and download_item.media.media_metadata
and download_item.final_path
):
database.add(
download_item.media.media_metadata["id"],
download_item.final_path,
)
except GamdlInterfaceUrlParseError as e:
url_log.error(f"{e}")
continue
except Exception as e:
url_log.exception(f'Error processing "{url}": {e}')
error_count += 1
continue
logger.info(f"Finished with {error_count} error(s)")
+7 -7
View File
@@ -203,9 +203,9 @@ class CliConfig:
help=".wvd file path",
default=base_interface_create_sig.parameters["wvd_path"].default,
type=click.Path(
file_okay=False,
dir_okay=True,
writable=True,
file_okay=True,
dir_okay=False,
writable=False,
resolve_path=True,
),
),
@@ -344,12 +344,12 @@ class CliConfig:
default=base_downloader_sig.parameters["mp4box_path"].default,
),
]
wrapper_decrypt_ip: Annotated[
wrapper_decrypt_url: Annotated[
str,
option(
"--wrapper-decrypt-ip",
help="IP address and port for wrapper decryption",
default=base_downloader_sig.parameters["wrapper_decrypt_ip"].default,
"--wrapper-decrypt-url",
help="wrapper-v2 base URL for FairPlay decrypt (e.g. http://127.0.0.1:80 or host:port)",
default=base_downloader_sig.parameters["wrapper_decrypt_url"].default,
),
]
download_mode: Annotated[
+2071 -1921
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -27,7 +27,7 @@ class AppleMusicBaseDownloader:
mp4decrypt_path: str = "mp4decrypt",
ffmpeg_path: str = "ffmpeg",
mp4box_path: str = "MP4Box",
wrapper_decrypt_ip: str = "127.0.0.1:10020",
wrapper_decrypt_url: str = "http://127.0.0.1:80",
download_mode: DownloadMode = DownloadMode.YTDLP,
album_folder_template: str = "{album_artist}/{album}",
compilation_folder_template: str = "Compilations/{album}",
@@ -49,7 +49,7 @@ class AppleMusicBaseDownloader:
self.mp4decrypt_path = mp4decrypt_path
self.ffmpeg_path = ffmpeg_path
self.mp4box_path = mp4box_path
self.wrapper_decrypt_ip = wrapper_decrypt_ip
self.wrapper_decrypt_url = wrapper_decrypt_url
self.download_mode = download_mode
self.album_folder_template = album_folder_template
self.compilation_folder_template = compilation_folder_template
+181 -181
View File
@@ -1,181 +1,181 @@
from pathlib import Path
import structlog
from ..interface.enums import CoverFormat
from ..interface.types import AppleMusicMedia, DecryptionKeyAv
from .amdecrypt import decrypt_file, decrypt_file_hex
from .base import AppleMusicBaseDownloader
from .types import DownloadItem
logger = structlog.get_logger(__name__)
class AppleMusicSongDownloader:
def __init__(
self,
base: AppleMusicBaseDownloader,
):
self.base = base
async def get_download_item(self, media: AppleMusicMedia) -> DownloadItem:
download_item = DownloadItem(media)
if media.stream_info:
download_item.staged_path = self.base.get_temp_path(
media.media_metadata["id"],
download_item.uuid_,
"staged",
"." + media.stream_info.file_format.value,
)
download_item.final_path = self.base.get_final_path(
media.tags,
".m4a",
media.playlist_tags,
)
if media.playlist_tags:
download_item.playlist_file_path = self.base.get_playlist_file_path(
media.playlist_tags,
)
download_item.synced_lyrics_path = self.get_synced_lyrics_path(
download_item.final_path
)
download_item.cover_path = self.get_cover_path(
download_item.final_path,
media.cover.file_extension,
)
return download_item
async def _decrypt_amdecrypt(
self,
input_path: str,
output_path: str,
media_id: str,
fairplay_key: str,
) -> None:
await decrypt_file(
self.base.wrapper_decrypt_ip,
media_id,
fairplay_key,
input_path,
output_path,
)
async def _decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
legacy: bool,
media_id: str,
fairplay_key: str,
):
log = logger.bind(
action="stage_song",
media_id=media_id,
encrypted_path=encrypted_path,
staged_path=staged_path,
)
if self.base.interface.base.use_wrapper and not legacy:
await self._decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self._decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy,
)
log.debug("success")
def get_synced_lyrics_path(self, final_path: str) -> str:
log = logger.bind(action="get_synced_lyrics_path", final_path=final_path)
synced_lyrics_path = str(
Path(final_path).with_suffix(
"." + self.base.interface.song.synced_lyrics_format.value
)
)
log.debug("success", synced_lyrics_path=synced_lyrics_path)
return synced_lyrics_path
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
log = logger.bind(
action="get_song_cover_path",
final_path=final_path,
file_extension=file_extension,
)
cover_path = str(Path(final_path).parent / ("Cover" + file_extension))
log.debug("success", cover_path=cover_path)
return cover_path
async def download(
self,
download_item: DownloadItem,
) -> None:
encrypted_path = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.media.decryption_key,
download_item.media.stream_info.audio_track.legacy,
download_item.media.media_metadata["id"],
download_item.media.stream_info.audio_track.fairplay_key,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
download_item.media.cover.url
)
if self.base.interface.base.cover_format != CoverFormat.RAW
else None
)
await self.base.apply_tags(
download_item.staged_path,
download_item.media.tags,
cover_bytes,
)
from pathlib import Path
import structlog
from ..interface.enums import CoverFormat
from ..interface.types import AppleMusicMedia, DecryptionKeyAv
from .amdecrypt import decrypt_file, decrypt_file_hex
from .base import AppleMusicBaseDownloader
from .types import DownloadItem
logger = structlog.get_logger(__name__)
class AppleMusicSongDownloader:
def __init__(
self,
base: AppleMusicBaseDownloader,
):
self.base = base
async def get_download_item(self, media: AppleMusicMedia) -> DownloadItem:
download_item = DownloadItem(media)
if media.stream_info:
download_item.staged_path = self.base.get_temp_path(
media.media_metadata["id"],
download_item.uuid_,
"staged",
"." + media.stream_info.file_format.value,
)
download_item.final_path = self.base.get_final_path(
media.tags,
".m4a",
media.playlist_tags,
)
if media.playlist_tags:
download_item.playlist_file_path = self.base.get_playlist_file_path(
media.playlist_tags,
)
download_item.synced_lyrics_path = self.get_synced_lyrics_path(
download_item.final_path
)
download_item.cover_path = self.get_cover_path(
download_item.final_path,
media.cover.file_extension,
)
return download_item
async def _decrypt_amdecrypt(
self,
input_path: str,
output_path: str,
media_id: str,
fairplay_key: str,
) -> None:
await decrypt_file(
self.base.wrapper_decrypt_url,
media_id,
fairplay_key,
input_path,
output_path,
)
async def _decrypt_amdecrypt_hex(
self,
input_path: str,
output_path: str,
decryption_key: str,
legacy: bool = False,
) -> None:
await decrypt_file_hex(
input_path,
output_path,
decryption_key,
legacy=legacy,
)
async def stage(
self,
encrypted_path: str,
staged_path: str,
decryption_key: DecryptionKeyAv,
legacy: bool,
media_id: str,
fairplay_key: str,
):
log = logger.bind(
action="stage_song",
media_id=media_id,
encrypted_path=encrypted_path,
staged_path=staged_path,
)
if self.base.interface.base.use_wrapper and not legacy:
await self._decrypt_amdecrypt(
encrypted_path,
staged_path,
media_id,
fairplay_key,
)
else:
await self._decrypt_amdecrypt_hex(
encrypted_path,
staged_path,
decryption_key.audio_track.key,
legacy,
)
log.debug("success")
def get_synced_lyrics_path(self, final_path: str) -> str:
log = logger.bind(action="get_synced_lyrics_path", final_path=final_path)
synced_lyrics_path = str(
Path(final_path).with_suffix(
"." + self.base.interface.song.synced_lyrics_format.value
)
)
log.debug("success", synced_lyrics_path=synced_lyrics_path)
return synced_lyrics_path
def get_cover_path(
self,
final_path: str,
file_extension: str,
) -> str:
log = logger.bind(
action="get_song_cover_path",
final_path=final_path,
file_extension=file_extension,
)
cover_path = str(Path(final_path).parent / ("Cover" + file_extension))
log.debug("success", cover_path=cover_path)
return cover_path
async def download(
self,
download_item: DownloadItem,
) -> None:
encrypted_path = self.base.get_temp_path(
download_item.media.media_metadata["id"],
download_item.uuid_,
"encrypted",
".m4a",
)
await self.base.download_stream(
download_item.media.stream_info.audio_track.stream_url,
encrypted_path,
)
await self.stage(
encrypted_path,
download_item.staged_path,
download_item.media.decryption_key,
download_item.media.stream_info.audio_track.legacy,
download_item.media.media_metadata["id"],
download_item.media.stream_info.audio_track.fairplay_key,
)
cover_bytes = (
await self.base.interface.base.get_cover_bytes(
download_item.media.cover.url
)
if self.base.interface.base.cover_format != CoverFormat.RAW
else None
)
await self.base.apply_tags(
download_item.staged_path,
download_item.media.tags,
cover_bytes,
)
+541 -543
View File
File diff suppressed because it is too large Load Diff